import os
import csv
import json
import gzip
import logging
import itertools
import collections.abc
import tempfile
from pathlib import Path
from collections import defaultdict, Counter
from textwrap import dedent
from typing import Iterator, List, Mapping, Optional, Set, Tuple, Union, Iterable
from urllib.request import urlretrieve
from adeft.disambiguate import load_disambiguator
from adeft.modeling.classify import load_model_info
from adeft import available_shortforms as available_adeft_models
from .term import Term, get_identifiers_curie, get_identifiers_url
from .process import normalize, replace_dashes, replace_greek_uni, \
replace_greek_latin, replace_greek_spelled_out, depluralize, \
replace_roman_arabic
from .scorer import Match, generate_match, score
from .resources import get_gilda_models, get_grounding_terms
__all__ = [
"Grounder",
"GrounderInput",
"ScoredMatch",
"load_terms_file",
"load_entries_from_terms_file",
"filter_for_organism",
"load_adeft_models",
"load_gilda_models",
]
logger = logging.getLogger(__name__)
GrounderInput = Union[str, Path, Iterable[Term], Mapping[str, List[Term]]]
#: The default namespace priority order
DEFAULT_NAMESPACE_PRIORITY = [
'FPLX', 'HGNC', 'UP', 'CHEBI', 'GO', 'MESH', 'DOID', 'HP', 'EFO'
]
[docs]class Grounder(object):
"""Class to look up and ground query texts in a terms file.
Parameters
----------
terms :
Specifies the grounding terms that should be loaded in the Grounder.
- If ``None``, the default grounding terms are loaded from the
versioned resource folder.
- If :class:`str` or :class:`pathlib.Path`, it is interpreted
as a path to a grounding terms gzipped TSV file which is then
loaded. If it's a str and looks like a URL, will be downloaded from the internet
- If :class:`dict`, it is assumed to be a grounding terms dict with
normalized entity strings as keys and :class:`gilda.term.Term`
instances as values.
- If :class:`list`, :class:`set`, :class:`tuple`, or any other iterable,
it is assumed to be a flat list of
:class:`gilda.term.Term` instances.
namespace_priority :
Specifies a term namespace priority order. For example, if multiple
terms are matched with the same score, will use this list to decide
which are given by which namespace appears further towards the front
of the list. By default, :data:`DEFAULT_NAMESPACE_PRIORITY` is used,
which, for example, prioritizes famplex entities over HGNC ones.
"""
entries: Mapping[str, List[Term]]
namespace_priority: List[str]
def __init__(
self,
terms: Optional[GrounderInput] = None,
*,
namespace_priority: Optional[List[str]] = None,
):
if terms is None:
terms = get_grounding_terms()
if isinstance(terms, str) and terms.startswith("http"):
with tempfile.TemporaryDirectory() as directory:
path = Path(directory).joinpath("terms.tsv.gz")
urlretrieve(terms, path) # noqa:S310
self.entries = load_terms_file(path)
elif isinstance(terms, (str, Path)):
extension = os.path.splitext(terms)[1]
if extension == '.db':
from .resources.sqlite_adapter import SqliteEntries
self.entries = SqliteEntries(terms)
else:
self.entries = load_terms_file(terms)
elif isinstance(terms, dict):
self.entries = terms
elif isinstance(terms, collections.abc.Iterable):
self.entries = defaultdict(list)
for term in terms:
self.entries[term.norm_text].append(term)
self.entries = dict(self.entries)
else:
raise TypeError('terms is neither a path nor a list of terms,'
'nor a normalized entry name to term dictionary')
self.prefix_index = {}
self._build_prefix_index()
self.adeft_disambiguators = find_adeft_models()
self.gilda_disambiguators = None
self.namespace_priority = (
DEFAULT_NAMESPACE_PRIORITY
if namespace_priority is None else
namespace_priority
)
def _build_prefix_index(self):
prefix_index = defaultdict(set)
for norm_term in self.entries:
if not norm_term:
continue
parts = norm_term.split()
if not parts:
continue
prefix_index[parts[0]].add(len(parts))
self.prefix_index = dict(prefix_index)
[docs] def lookup(self, raw_str: str) -> List[Term]:
"""Return matching Terms for a given raw string.
Parameters
----------
raw_str :
A string to be looked up in the set of Terms that the Grounder
contains.
Returns
-------
:
A list of Terms that are potential matches for the given string.
"""
lookups = self._generate_lookups(raw_str)
entries = []
for lookup in lookups:
entries += self.entries.get(lookup, [])
return entries
def _generate_lookups(self, raw_str: str) -> Set[str]:
# TODO: we should propagate flags about depluralization and possible
# other modifications made here and take them into account when
# scoring
# We first add the normalized string itself
norm = normalize(raw_str)
lookups = {norm}
# Then we add a version with dashes replaced by spaces
norm_spacedash = normalize(replace_dashes(raw_str, ' '))
lookups.add(norm_spacedash)
# We then try to replace spelled out greek letters with
# their unicode equivalents or their latin equivalents
greek_replaced = normalize(replace_greek_uni(raw_str))
lookups.add(greek_replaced)
greek_replaced = normalize(replace_greek_latin(raw_str))
lookups.add(greek_replaced)
greek_replaced = normalize(replace_greek_spelled_out(raw_str))
lookups.add(greek_replaced)
# We try exchanging roman and arabic numerals
roman_arabic = normalize(replace_roman_arabic(raw_str))
lookups.add(roman_arabic)
# Finally, we attempt to depluralize the word
for singular, rule in depluralize(raw_str):
lookups.add(normalize(singular))
logger.debug('Looking up the following strings: %s' %
', '.join(lookups))
return lookups
def _score_namespace(self, term) -> int:
"""Apply a priority to the term based on its namespace.
.. note::
This is currently not included as an explicit score term.
It is just used to rank identically scored entries.
"""
try:
return len(self.namespace_priority) - self.namespace_priority.index(term.db)
except ValueError:
return 0
[docs] def ground_best(
self,
raw_str: str,
context: Optional[str] = None,
organisms: Optional[List[str]] = None,
namespaces: Optional[List[str]] = None,
) -> Optional["ScoredMatch"]:
"""Return the best scored grounding for a given raw string.
Parameters
----------
raw_str : str
A string to be grounded with respect to the set of Terms that the
Grounder contains.
context : Optional[str]
Any additional text that serves as context for disambiguating the
given entity text, used if a model exists for disambiguating the
given text.
organisms : Optional[List[str]]
An optional list of organism identifiers defining a priority
ranking among organisms, if genes/proteins from multiple
organisms match the input. If not provided, the default
['9606'] i.e., human is used.
namespaces : Optional[List[str]]
A list of namespaces to restrict matches to. This will apply to
both the primary namespace of a matched term, to any subsumed
matches, and to the source namespaces of terms if they were
created using cross-reference mappings. By default, no
restriction is applied.
Returns
-------
Optional[gilda.grounder.ScoredMatch]
The best ScoredMatch returned by :meth:`ground` if any are returned,
otherwise None.
"""
scored_matches = self.ground(
raw_str=raw_str,
context=context,
organisms=organisms,
namespaces=namespaces,
)
if scored_matches:
# Because of the way the ground() function is implemented,
# the first element is guaranteed to have the best score
# (after filtering by namespace)
return scored_matches[0]
return None
[docs] def ground(self, raw_str, context=None, organisms=None,
namespaces=None):
"""Return scored groundings for a given raw string.
Parameters
----------
raw_str : str
A string to be grounded with respect to the set of Terms that the
Grounder contains.
context : Optional[str]
Any additional text that serves as context for disambiguating the
given entity text, used if a model exists for disambiguating the
given text.
organisms : Optional[List[str]]
An optional list of organism identifiers defining a priority
ranking among organisms, if genes/proteins from multiple
organisms match the input. If not provided, the default
['9606'] i.e., human is used.
namespaces : Optional[List[str]]
A list of namespaces to restrict matches to. This will apply to
both the primary namespace of a matched term, to any subsumed
matches, and to the source namespaces of terms if they were
created using cross-reference mappings. By default, no
restriction is applied.
Returns
-------
list[gilda.grounder.ScoredMatch]
A list of ScoredMatch objects representing the groundings sorted
by decreasing score.
"""
if not organisms:
organisms = ['9606']
# Stripping whitespaces is done up front directly on the raw string
# so that all lookups and comparisons are done with respect to the
# stripped string
raw_str = raw_str.strip()
# Initial lookup of all possible matches
entries = self.lookup(raw_str)
logger.debug('Filtering %d entries by organism' % len(entries))
entries = filter_for_organism(entries, organisms)
logger.debug('Comparing %s with %d entries' %
(raw_str, len(entries)))
# For each entry to compare to, we generate a match data structure
# describing the comparison of the raw (unnormalized) input string
# and the entity text corresponding to the matched Term. This match
# is then further scored to account for the nature of the grounding
# itself.
scored_matches = []
for term in entries:
match = generate_match(raw_str, term.text)
sc = score(match, term)
scored_match = ScoredMatch(term, sc, match)
scored_matches.append(scored_match)
# Return early if we don't have anything to avoid calling other
# functions with no matches
if not scored_matches:
return scored_matches
# Merge equivalent matches
unique_scores = self._merge_equivalent_matches(scored_matches)
# If there's context available, disambiguate based on that
if context:
unique_scores = self.disambiguate(raw_str, unique_scores, context)
# Then sort by decreasing score
rank_fun = lambda x: (x.score, self._score_namespace(x.term))
unique_scores = sorted(unique_scores, key=rank_fun, reverse=True)
# If we have a namespace constraint, we filter to the given
# namespaces.
if namespaces:
unique_scores = [
scored_match for scored_match in unique_scores
if scored_match.get_namespaces() & set(namespaces)
]
return unique_scores
def disambiguate(self, raw_str, scored_matches, context):
# This is only called if context was passed in so we do lazy
# loading here
if self.gilda_disambiguators is None:
self.gilda_disambiguators = load_gilda_models()
# If we don't have a disambiguator for this string, we return with
# the original scores intact. Otherwise, we attempt to disambiguate.
if raw_str in self.adeft_disambiguators:
logger.info('Running Adeft disambiguation for %s' % raw_str)
try:
scored_matches = \
self.disambiguate_adeft(raw_str, scored_matches, context)
except Exception as e:
logger.exception(e)
elif raw_str in self.gilda_disambiguators:
logger.info('Running Gilda disambiguation for %s' % raw_str)
try:
scored_matches = \
self.disambiguate_gilda(raw_str, scored_matches, context)
except Exception as e:
logger.exception(e)
return scored_matches
def disambiguate_adeft(self, raw_str, scored_matches, context):
# We find the disambiguator for the given string and pass in
# context
if self.adeft_disambiguators[raw_str] is None:
self.adeft_disambiguators[raw_str] = load_disambiguator(raw_str)
res = self.adeft_disambiguators[raw_str].disambiguate([context])
# The actual grounding dict is at this index in the result
grounding_dict = res[0][2]
logger.debug('Result from Adeft: %s' % str(grounding_dict))
# We attempt to get the score for the 'ungrounded' entry
ungrounded_score = grounding_dict.get('ungrounded', 1.0)
# Now we check if each scored match has a corresponding Adeft
# grounding and score. If we find one, we multiply the original
# match score with the Adeft score. Otherwise, we multiply the
# original score with the 'ungrounded' score given by Adeft.
for match in scored_matches:
has_adeft_grounding = False
for grounding, score in grounding_dict.items():
# There is a corner case here where grounding is
# some name other than 'ungrounded' but is not a proper
# ns:id pair.
if grounding == 'ungrounded' or ':' not in grounding:
continue
db, id = grounding.split(':', maxsplit=1)
if match.term.db == db and match.term.id == id:
match.disambiguation = {'type': 'adeft',
'score': score,
'match': 'grounded'}
match.multiply(score)
has_adeft_grounding = True
break
if not has_adeft_grounding:
match.disambiguation = {'type': 'adeft',
'score': ungrounded_score,
'match': 'ungrounded'}
match.multiply(ungrounded_score)
return scored_matches
def disambiguate_gilda(self, raw_str, scored_matches, context):
res = self.gilda_disambiguators[raw_str].predict_proba([context])
if not res:
raise ValueError('No result from disambiguation.')
grounding_dict = res[0]
for match in scored_matches:
key = '%s:%s' % (match.term.db, match.term.id)
score_entry = grounding_dict.get(key, None)
score = score_entry if score_entry is not None else 0.0
match.disambiguation = {'type': 'gilda',
'score': score,
'match': ('grounded'
if score_entry is not None
else 'ungrounded')}
match.multiply(score)
return scored_matches
@staticmethod
def _merge_equivalent_matches(scored_matches):
unique_entries = []
# Characterize an entry by its grounding
term_dbid = lambda x: (x.term.db, x.term.id)
# Sort and group scores by grounding
scored_matches.sort(key=term_dbid)
entry_groups = itertools.groupby(scored_matches, key=term_dbid)
# Now look at each group and find the highest scoring match
for _, entry_group in entry_groups:
entries = sorted(list(entry_group), key=lambda x: x.score,
reverse=True)
entries[0].subsumed_terms = [e.term for e in entries[1:]]
unique_entries.append(entries[0])
# Return the list of unique entries
return unique_entries
[docs] def get_models(self):
"""Return a list of entity texts for which disambiguation models exist.
Returns
-------
list[str]
The list of entity texts for which a disambiguation model is
available.
"""
if self.gilda_disambiguators is None:
self.gilda_disambiguators = load_gilda_models()
return sorted(list(self.gilda_disambiguators.keys()))
[docs] def get_names(self, db, id, status=None, source=None):
"""Return a list of entity texts corresponding to a given database ID.
Parameters
----------
db : str
The database in which the ID is an entry, e.g., HGNC.
id : str
The ID of an entry in the database.
status : Optional[str]
If given, only entity texts with the given status e.g., "synonym"
are returned.
source : Optional[str]
If given, only entity texts from the given source e.g., "uniprot"
are returned.
Returns
-------
names: list[str]
A list of entity texts corresponding to the given database/ID
"""
names = set()
for entries in self.entries.values():
for entry in entries:
if (entry.db == db) and (entry.id == id) and \
(not status or entry.status == status) and \
(not source or entry.source == source):
names.add(entry.text)
return sorted(names)
[docs] def get_ambiguities(self,
skip_names: bool = True,
skip_curated: bool = True,
skip_name_matches: bool = True,
skip_species_ambigs: bool = True) -> List[List[Term]]:
"""Return a list of ambiguous term groups in the grounder.
Parameters
----------
skip_names :
If True, groups of terms where one has the "name" status are
skipped. This makes sense usually since these are prioritized over
synonyms anyway.
skip_curated :
If True, groups of terms where one has the "curated" status
are skipped. This makes sense usually since these are prioritized
over synonyms anyway.
skip_name_matches :
If True, groups of terms that all share the same standard name
are skipped. This is effective at eliminating spurious ambiguities
due to unresolved cross-references between equivalent terms
in different namespaces.
skip_species_ambigs :
If True, groups of terms that are all genes or proteins, and are
all from different species (one term from each species) are skipped.
This is effective at eliminating ambiguities between orthologous
genes in different species that are usually resolved using the
organism priority list.
"""
ambig_entries = defaultdict(list)
for terms in self.entries.values():
for term in terms:
# We consider it an ambiguity if the same text entry appears
# multiple times
key = term.text
ambig_entries[key].append(term)
# It's only an ambiguity if there are two entries at least
ambig_entries = {k: v for k, v in ambig_entries.items()
if len(v) >= 2}
ambigs = []
for text, entries in ambig_entries.items():
dbs = {e.db for e in entries}
db_ids = {(e.db, e.id) for e in entries}
statuses = {e.status for e in entries}
sources = {e.source for e in entries}
names = {e.entry_name for e in entries}
# If the entries all point to the same ID, we skip it
if len(db_ids) <= 1:
continue
# If there is a name in statuses, we skip it because it's
# prioritized
if skip_names and 'name' in statuses:
continue
# We skip curated terms because they are prioritized anyway
if skip_curated and 'curated' in statuses:
continue
# If there is an adeft model already, we skip it
if 'adeft' in sources:
continue
if skip_name_matches:
if len({e.entry_name.lower() for e in entries}) == 1:
continue
if skip_species_ambigs:
if dbs <= {'HGNC', 'UP'} and \
len({e.organism for e in entries}) == len(entries):
continue
# Everything else is an ambiguity
ambigs.append(entries)
return ambigs
def _iter_terms(self):
for terms in self.entries.values():
yield from terms
[docs] def summary_str(self) -> str:
"""Summarize the contents of the grounder."""
namespaces = {ns for term in self._iter_terms() for ns in term.get_namespaces()}
status_counter = dict(Counter(term.status for term in self._iter_terms()))
return dedent(f"""\
Lookups: {len(self.entries):,}
Terms: {sum(len(terms) for terms in self.entries.values()):,}
Term Namespaces: {namespaces}
Term Statuses: {status_counter}
Adeft Disambiguators: {len(self.adeft_disambiguators):,}
Gilda Disambiguators: {len(self.gilda_disambiguators):,}
""")
[docs] def print_summary(self, **kwargs) -> None:
"""Print the summary of this grounder."""
print(self.summary_str(), **kwargs)
[docs]class ScoredMatch(object):
"""Class representing a scored match to a grounding term.
Attributes
-----------
term : gilda.grounder.Term
The Term that the scored match is for.
score : float
The score associated with the match.
match : gilda.scorer.Match
The Match object characterizing the match to the Term.
disambiguation : Optional[dict]
Meta-information about disambiguation, when available.
subsumed_terms : Optional[list[gilda.grounder.Term]]
A list of additional Term objects that also matched, have the same
db/id value as the term associated with the match, but were further
down the score ranking. In some cases examining the subsumed terms
associated with a match can provide additional metadata in
downstream applications.
"""
def __init__(self, term: Term, score, match: Match, disambiguation=None,
subsumed_terms=None):
self.term = term
self.url = term.get_idenfiers_url()
self.score = score
self.match = match
self.disambiguation = disambiguation
self.subsumed_terms = subsumed_terms if subsumed_terms else None
def __str__(self):
disamb_str = '' if self.disambiguation is None else \
(',disambiguation=' + json.dumps(self.disambiguation))
return 'ScoredMatch(%s,%s,%s%s)' % \
(self.term, self.score, self.match, disamb_str)
def __repr__(self):
return str(self)
def to_json(self):
js = {
'term': self.term.to_json(),
'url': self.url,
'score': self.score,
'match': self.match.to_json()
}
if self.disambiguation is not None:
js['disambiguation'] = self.disambiguation
if self.subsumed_terms:
js['subsumed_terms'] = [term.to_json()
for term in self.subsumed_terms]
return js
def multiply(self, value):
logger.debug('Multiplying the score of "%s" with %.3f'
% (self.term.entry_name, value))
self.score = self.score * value
[docs] def get_namespaces(self) -> Set[str]:
"""Return all namespaces for this match including from mapped and
subsumed terms.
Returns
-------
:
A set of strings representing namespaces for terms involved in
this match, including the namespace for the primary term as well
as any subsumed terms, and groundings that come from having
mapped an original source grounding during grounding resource
construction.
"""
return {ns for ns, _ in self.get_groundings()}
[docs] def get_groundings(self) -> Set[Tuple[str, str]]:
"""Return all groundings for this match including from mapped and
subsumed terms.
Returns
-------
:
A set of tuples representing groundings for this match including
the grounding for the primary term as well as any subsumed
terms, and groundings that come from having mapped an original
source grounding during grounding resource construction.
"""
term_groundings = self.term.get_groundings()
if self.subsumed_terms:
for sub_term in self.subsumed_terms:
term_groundings |= sub_term.get_groundings()
return term_groundings
[docs] def get_grounding_dict(self) -> Mapping[str, str]:
"""Get the groundings as CURIEs and URLs."""
return {
get_identifiers_curie(db, db_id): get_identifiers_url(db, db_id)
for db, db_id in self.get_groundings()
}
[docs]def load_entries_from_terms_file(terms_file: Union[str, Path]) -> Iterator[Term]:
"""Yield Terms from a compressed terms TSV file path.
Parameters
----------
terms_file :
Path to a compressed TSV terms file with columns corresponding to the
serialized elements of a Term.
Returns
-------
:
Terms loaded from the file yielded by a generator.
"""
with gzip.open(terms_file, 'rt', encoding='utf-8') as fh:
entries = {}
reader = csv.reader(fh, delimiter='\t')
# Skip header
next(reader)
for row in reader:
row_nones = [r if r else None for r in row]
yield Term(*row_nones)
[docs]def load_terms_file(terms_file: Union[str, Path]) -> Mapping[str, List[Term]]:
"""Load a TSV file containing terms into a lookup dictionary.
Parameters
----------
terms_file :
Path to a compressed TSV terms file with columns corresponding to the
serialized elements of a Term.
Returns
-------
:
A lookup dictionary whose keys are normalized entity texts, and values
are lists of Terms with that normalized entity text.
"""
entries = {}
for term in load_entries_from_terms_file(terms_file):
if term.norm_text in entries:
entries[term.norm_text].append(term)
else:
entries[term.norm_text] = [term]
return entries
def filter_for_organism(terms, organisms):
# First we organize terms by organism, including None
terms_by_organism = defaultdict(list)
for term in terms:
# We filter out any organisms that aren't in the list provided
if term.organism is not None and term.organism not in organisms:
continue
terms_by_organism[term.organism].append(term)
# We first take the terms without organism
all_terms = terms_by_organism[None]
# We now find the top organism for which we have at least
# one term and then add the corresponding terms to the list
# of all terms
if set(terms_by_organism) != {None}:
top_organism = min(set(terms_by_organism) - {None},
key=lambda x: organisms.index(x))
all_terms += terms_by_organism[top_organism]
return all_terms
def find_adeft_models():
adeft_disambiguators = {}
for shortform in available_adeft_models:
adeft_disambiguators[shortform] = None
return adeft_disambiguators
def load_adeft_models():
return {shortform: load_disambiguator(shortform)
for shortform in find_adeft_models()}
def load_gilda_models(cutoff=0.7):
with gzip.open(get_gilda_models(), 'rt') as fh:
models = {k: load_model_info(v)
for k, v in json.loads(fh.read()).items()
if v['stats']['f1']['mean'] > cutoff}
return models