Source code for bioregistry.utils

"""Utilities."""

import logging
from dataclasses import asdict, is_dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, List, Mapping, Optional, Union, cast

import click
import requests
from pydantic import BaseModel
from pydantic.json import ENCODERS_BY_TYPE
from pystow.utils import get_hashes

from .constants import (
    BIOREGISTRY_PATH,
    COLLECTIONS_YAML_PATH,
    METAREGISTRY_YAML_PATH,
    REGISTRY_YAML_PATH,
)

logger = logging.getLogger(__name__)

#: Wikidata SPARQL endpoint. See https://www.wikidata.org/wiki/Wikidata:SPARQL_query_service#Interfacing
WIKIDATA_ENDPOINT = "https://query.wikidata.org/bigdata/namespace/wdq/sparql"


class OLSBroken(RuntimeError):
    """Raised when the OLS is having a problem."""


def secho(s, fg="cyan", bold=True, **kwargs):
    """Wrap :func:`click.secho`."""
    click.echo(
        f'[{datetime.now().strftime("%H:%M:%S")}] ' + click.style(s, fg=fg, bold=bold, **kwargs)
    )


def removeprefix(s: Optional[str], prefix: str) -> Optional[str]:
    """Remove the prefix from the string."""
    if s is None:
        return None
    if s.startswith(prefix):
        return s[len(prefix) :]
    return s


def removesuffix(s: Optional[str], suffix: str) -> Optional[str]:
    """Remove the prefix from the string."""
    if s is None:
        return None
    if s.endswith(suffix):
        return s[: -len(suffix)]
    return s


def query_wikidata(sparql: str) -> List[Mapping[str, Any]]:
    """Query Wikidata's sparql service.

    :param sparql: A SPARQL query string
    :return: A list of bindings
    """
    logger.debug("running query: %s", sparql)
    res = requests.get(WIKIDATA_ENDPOINT, params={"query": sparql, "format": "json"})
    res.raise_for_status()
    res_json = res.json()
    return res_json["results"]["bindings"]


def extended_encoder(obj: Any) -> Any:
    """Encode objects similarly to :func:`pydantic.json.pydantic_encoder`."""
    if isinstance(obj, BaseModel):
        return obj.dict(exclude_none=True)
    elif is_dataclass(obj):
        return asdict(obj)

    # Check the class type and its superclasses for a matching encoder
    for base in obj.__class__.__mro__[:-1]:
        try:
            encoder = ENCODERS_BY_TYPE[base]
        except KeyError:
            continue
        return encoder(obj)
    else:  # We have exited the for loop without finding a suitable encoder
        raise TypeError(f"Object of type '{obj.__class__.__name__}' is not JSON serializable")


class NormDict(dict):
    """A dictionary that supports lexical normalization of keys on setting and getting."""

    def __setitem__(self, key: str, value: str) -> None:
        """Set an item from the dictionary after lexically normalizing it."""
        norm_key = _norm(key)
        if value is None:
            raise ValueError(f"Tried to add empty value for {key}/{norm_key}")
        if norm_key in self and self[norm_key] != value:
            raise KeyError(
                f"Tried to add {norm_key}/{value} when already had {norm_key}/{self[norm_key]}"
            )
        super().__setitem__(norm_key, value)

    def __getitem__(self, item: str) -> str:
        """Get an item from the dictionary after lexically normalizing it."""
        return super().__getitem__(_norm(item))

    def __contains__(self, item) -> bool:
        """Check if an item is in the dictionary after lexically normalizing it."""
        return super().__contains__(_norm(item))

    def get(self, key: str, default=None) -> str:
        """Get an item from the dictionary after lexically normalizing it."""
        return super().get(_norm(key), default)


def _norm(s: str) -> str:
    """Normalize a string for dictionary key usage."""
    rv = s.casefold().lower()
    for x in " -_./":
        rv = rv.replace(x, "")
    return rv


def norm(s: str) -> str:
    """Normalize a string for dictionary key usage."""
    rv = s.lower()
    for x in " .-":
        rv = rv.replace(x, "")
    return rv


[docs]def curie_to_str(prefix: str, identifier: str) -> str: """Combine a prefix and identifier into a CURIE string.""" return f"{prefix}:{identifier}"
def get_hexdigests(alg: str = "sha256") -> Mapping[str, str]: """Get hex digests.""" return { path.as_posix(): _get_hexdigest(path, alg=alg) for path in ( BIOREGISTRY_PATH, REGISTRY_YAML_PATH, METAREGISTRY_YAML_PATH, COLLECTIONS_YAML_PATH, ) } def _get_hexdigest(path: Union[str, Path], alg: str = "sha256") -> str: hashes = get_hashes(path, [alg]) return hashes[alg].hexdigest() def get_ols_descendants( ontology: str, uri: str, *, force_download: bool = False, get_identifier=None, clean=None ) -> Mapping[str, Mapping[str, Any]]: """Get descendants in the OLS.""" url = f"https://www.ebi.ac.uk/ols/api/ontologies/{ontology}/terms/{uri}/descendants?size=1000" res = requests.get(url) res.raise_for_status() res_json = res.json() try: terms = res_json["_embedded"]["terms"] except KeyError: raise OLSBroken from None return _process_ols(ontology=ontology, terms=terms, clean=clean, get_identifier=get_identifier) def _process_ols( *, ontology, terms, clean=None, get_identifier=None ) -> Mapping[str, Mapping[str, Any]]: if clean is None: clean = _clean if get_identifier is None: get_identifier = _get_identifier rv = {} for term in terms: identifier = get_identifier(term, ontology) description = term.get("description") rv[identifier] = { "name": clean(term["label"]), "description": description and description[0], "obsolete": term.get("is_obsolete", False), } return rv def _get_identifier(term, ontology: str) -> str: return term["obo_id"][len(ontology) + 1 :] def _clean(s: str) -> str: s = cast(str, removesuffix(s, "identifier")).strip() s = cast(str, removesuffix(s, "ID")).strip() s = cast(str, removesuffix(s, "accession")).strip() return s