Source code for bioregistry.validate.utils

"""Validation utilities."""

from __future__ import annotations

import json
from collections.abc import Callable, Mapping
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, TextIO, cast

import click
from pydantic import BaseModel
from typing_extensions import NotRequired, TypedDict, Unpack

if TYPE_CHECKING:
    from bioregistry import Context, Manager

__all__ = [
    "Message",
    "click_write_messages",
    "format_messages",
    "tabulate_messages",
    "validate_jsonld",
    "validate_linkml",
    "validate_prefix_map",
    "validate_ttl",
    "validate_virtuoso",
]


[docs] class Message(BaseModel): """A message.""" prefix: str uri_prefix: str error: str solution: str | None = None line: int | None = None level: Literal["warning", "error"]
LEVEL_TO_COLOR = { "warning": "yellow", "error": "red", }
[docs] def format_messages(messages: list[Message]) -> str: """Format messages as a human-readable string.""" return "\n".join(_spacious_message(message) for message in messages)
[docs] def tabulate_messages(messages: list[Message], tablefmt: str = "github") -> str: """Format messages into a table using :mod:`tabulate`.""" from tabulate import tabulate rows = [ ( message.prefix, f"`{message.uri_prefix}`" if tablefmt == "rst" else message.uri_prefix, message.error, message.solution or "", ) for message in messages ] return ( tabulate(rows, headers=["prefix", "uri_prefix", "issue", "solution"], tablefmt=tablefmt) + "\n" )
def click_write_messages(messages: list[Message], tablefmt: str | None) -> None: """Write messages.""" if tablefmt is None: click.secho(format_messages(messages)) else: click.echo(tabulate_messages(messages, tablefmt=tablefmt)) errors = sum(message.level == "error" for message in messages) if errors: import sys click.secho(f"\nfailed with {errors:,} errors", fg="red") sys.exit(1) def _spacious_message(message: Message) -> str: s = "" if message.line: s += f"[line {message.line}] " s += f"{message.prefix}: {message.uri_prefix}\n " s += click.style("issue: " + message.error, fg=LEVEL_TO_COLOR[message.level]) if message.solution: s += click.style("\n suggestion: " + message.solution, fg="green") return s + "\n" class ValidateKwargs(TypedDict): """Keyword arguments for validators, passed to :func:`_get_all_messages`.""" rpm: NotRequired[Mapping[str, str] | None] use_preferred: NotRequired[bool] context: NotRequired[str | Context | None] strict: NotRequired[bool]
[docs] def validate_jsonld( obj: str | Mapping[str, Mapping[str, str]], **kwargs: Unpack[ValidateKwargs] ) -> list[Message]: """Validate a JSON-LD object.""" if isinstance(obj, str): if obj.startswith("http://") or obj.startswith("https://"): import requests res = requests.get(obj, timeout=15) res.raise_for_status() obj = res.json() else: path = Path(obj).resolve() if not path.is_file(): raise ValueError obj = json.loads(path.read_text()) if not isinstance(obj, dict): raise TypeError("data is not a dictionary") context_inner = obj.get("@context") if context_inner is None: raise TypeError("data is missing a @context field") if not isinstance(context_inner, dict): raise TypeError(f"@context is not a dictionary: {context_inner}") inputs: list[tuple[str, str, int | None]] = [(cp, up, None) for cp, up in context_inner.items()] return _get_all_messages(inputs, **kwargs)
[docs] def validate_ttl(url: str, **kwargs: Unpack[ValidateKwargs]) -> list[Message]: """Validate a remote Turtle file.""" import requests inputs: list[tuple[str, str, int | None]] = [] with requests.get(url, stream=True, timeout=15) as res: for line_number, line in enumerate(res.iter_lines(decode_unicode=True), start=1): if not line.startswith("@"): break # skip @base, or other if not line.startswith("@prefix"): continue line = line.removeprefix("@prefix ") curie_prefix, uri_prefix = line.split(":", 1) uri_prefix = uri_prefix.strip().rstrip(".").strip().strip("<>") inputs.append((curie_prefix, uri_prefix, line_number)) return _get_all_messages(inputs, **kwargs)
[docs] def validate_virtuoso(url: str, **kwargs: Any) -> list[Message]: """Validate a Virtuoso SPARQL endpoint's prefix map.""" prefix_map = get_virtuoso_prefix_map(url) return validate_prefix_map(prefix_map, **kwargs)
[docs] def validate_linkml(path_or_url: str | Path | TextIO, **kwargs: Any) -> list[Message]: """Validate a LinkML YAML configuration's prefix map.""" prefix_map = get_linkml_prefix_map(path_or_url) return validate_prefix_map(prefix_map, **kwargs)
[docs] def validate_prefix_map(prefix_map: dict[str, str], **kwargs: Any) -> list[Message]: """Validate a prefix map.""" inputs: list[tuple[str, str, int | None]] = [(k, v, None) for k, v in prefix_map.items()] return _get_all_messages(inputs, **kwargs)
def _get_all_messages( inputs: list[tuple[str, str, int | None]], *, context: str | None | Context = None, use_preferred: bool = False, rpm: Mapping[str, str] | None = None, strict: bool = False, manager: Manager | None = None, ) -> list[Message]: """Get messages for the given inputs.""" manager = _ensure_manager(manager) if rpm is None: rpm = manager.get_reverse_prefix_map() def _get_suggestions(uri_prefix: str) -> list[tuple[str, str]] | str: suggies = [] for x, y in rpm.items(): if x.startswith(uri_prefix): suggies.append((x, y)) if x == uri_prefix: return y return suggies _checker = _get_checker(context, use_preferred=use_preferred) # TODO need to implement URI prefix normalization and errors messages: list[Message] = [] for curie_prefix, uri_prefix, line_number in inputs: resource = manager.get_resource(curie_prefix) if resource is None: suggestions = _get_suggestions(uri_prefix) if not suggestions: messages.append( Message( line=line_number, prefix=curie_prefix, uri_prefix=uri_prefix, error="unknown CURIE prefix", level="error", ) ) else: if isinstance(suggestions, str): solution = f"Switch to CURIE prefix {suggestions}, inferred from URI prefix" level = "warning" else: level = "error" if len(suggestions) == 1: up, cp = suggestions[0] solution = f"Consider switching to the more specific CURIE/URI prefix pair {cp}: `{up}`" else: solution = "Consider switching one of these more specific CURIE/URI prefix pairs:\n\n" for up, cp in suggestions: solution += f" {cp}: `{up}`\n" messages.append( Message( line=line_number, prefix=curie_prefix, uri_prefix=uri_prefix, error="unknown CURIE prefix", solution=solution, level=level, ) ) else: if message := _get_message( curie_prefix, uri_prefix, _checker, strict=strict, line_number=line_number, use_preferred=use_preferred, ): messages.append(message) return messages def _get_message( curie_prefix: str, uri_prefix: str, _checker: Callable[[str], str | None], *, strict: bool = False, line_number: int | None = None, use_preferred: bool = False, ) -> Message | None: norm_prefix = _checker(curie_prefix) if use_preferred: middle = "preferred" else: middle = "standard" if norm_prefix is None: return Message( prefix=curie_prefix, uri_prefix=uri_prefix, error="unknown CURIE prefix", level="error", line=line_number, ) elif norm_prefix != curie_prefix: return Message( prefix=curie_prefix, uri_prefix=uri_prefix, error="non-standard CURIE prefix", solution=f"Switch to {middle} prefix: {norm_prefix}", level="error" if strict else "warning", line=line_number, ) else: return None def _ensure_manager(manager: Manager | None) -> Manager: if manager is not None: return manager from .. import resource_manager return resource_manager.manager def _get_checker( context: str | None | Context = None, use_preferred: bool = False, manager: Manager | None = None, ) -> Callable[[str], str | None]: manager = _ensure_manager(manager) if context is not None: converter = manager.get_converter_from_context(context) def _check(pp: str) -> str | None: return converter.standardize_prefix(pp, strict=False) else: def _check(pp: str) -> str | None: return manager.normalize_prefix(pp, use_preferred=use_preferred) return _check def get_virtuoso_prefix_map(url: str) -> dict[str, str]: """Get the internal prefix map from a Virtuoso service. :param url: The URL for the SPARQL endpoint, for example: - https://nfdi4culture.de/sparql - https://dbpedia.org/sparql :returns: The prefix map returned by the Virtuoso service. """ from bs4 import Tag from pystow.utils import get_soup if "nsdecl" not in url: url = url + "?nsdecl" soup = get_soup(url) table_body_tag = soup.find("tbody") if not isinstance(table_body_tag, Tag): raise ValueError( f"could not find table body tag, are you sure this is a Virtuoso " f"SPARQL endpoint? Error from {url}" ) rv = {left.text: right.text for left, right in table_body_tag.find_all("tr")} return rv def get_linkml_prefix_map(path_or_url: str | Path | TextIO) -> dict[str, str]: """Get the prefix map from a LinkML YAML configuration. :param path_or_url: The URL for the LinkML YAML configuration. Examples: - https://github.com/HendrikBorgelt/CatCore/raw/refs/heads/main/src/catcore/schema/catcore.yaml - https://github.com/mapping-commons/sssom/raw/refs/heads/master/src/sssom_schema/schema/sssom_schema.yaml :returns: The prefix map defined in the LinkML configuration """ import yaml from pystow.utils import safe_open with safe_open(path_or_url) as file: data = yaml.safe_load(file) return cast(dict[str, str], data["prefixes"])