Source code for xmu.api

"""Defines tools to work with the EMu API"""

import json
import logging
import re
import time
import tomllib
from functools import cache, cached_property
from pathlib import Path
from typing import Any
from urllib.parse import unquote_plus, urljoin

import requests

from .types import EMuDate, EMuLatitude, EMuLongitude, EMuTime
from .utils import is_ref


logger = logging.getLogger(__name__)


TIMEOUT = 30


[docs] class EMuAPI: """Connects to and queries the EMu API Parameters ---------- url : str, optional the url for the EMu API, including tenant username : str, optional an EMu username. If omitted, defaults to the current OS username. password : str, optional the password for the given username, If omitted, the user will be prompted for the password when the class is initiated. autopage : bool = True whether to automatically page through results if the total number of results exceeds the limit of a given request config_path : str | Path path to a TOML config file used to set url, username, password, and autopage parser : EMuAPIParser, optional the parser object used to parse individual records. The default EMuAPIParser class returns a close approximation of the format used by EMuRecord. If None, records will be returned as formatted by the API. Attributes ---------- module : str the backend name of an EMu module, for example, ecatalogue or eparties use_emu_syntax : bool specifies whether to use the EMu client syntax when parsing search terms. Clients searches escape control characters using a backslash. """ schema = None def __init__( self, url: str = None, username: str = None, password: str = None, autopage: bool = None, config_path: str | Path = "emurestapi.toml", parser: "EMuAPIParser" = None, ): self.config_path = config_path try: with open(self.config_path, "rb") as f: config = tomllib.load(f)["params"] except FileNotFoundError: self.base_url = url.rstrip("/") + "/" else: if not url: url = config["url"] if not username: username = config["username"] if not password: password = config["password"] if autopage is None: autopage = config["autopage"] self.base_url = url.rstrip("/") + "/" self.use_emu_syntax = True # Parse must be assigned when the instance is created self._parser = None self.parser = parser # The autopage parameter is passed to EMuAPIResponse but it is cleaner # to implement it here self.autopage = autopage # Get token self._token = None self.get_token(username=username, password=password) self._session = None @property def parser(self): """The parser object used to parse records returned by the API""" return self._parser @parser.setter def parser(self, val): self._parser = val if val: self._parser.api = self @property def session(self): """The session object to use for API queries""" if self._session is None: self._session = requests.Session() return self._session @session.setter def session(self, val): self._session = val
[docs] def get_token(self, refresh=False, **kwargs): """Retrieves a token from the server to authorize requests Parameters ---------- kwargs : username and password if no config file is found Returns ------- str the authorization token need to make API requests """ # Token requests sometimes fail, particularly if several are done quickly. # To prevent this, the token is cached to a file in the working directory when # it is read. if not refresh: try: with open("token") as f: self._token = f.read().strip() return self._token except FileNotFoundError: pass if not kwargs: with open(self.config_path, "rb") as f: kwargs = tomllib.load(f)["params"] # Token request includes exponential backoff if request fails for i in range(4, 8): resp = requests.post( urljoin(self.base_url, "tokens"), json={ "username": kwargs["username"], "password": kwargs["password"], }, headers={"Content-Type": "application/json"}, timeout=TIMEOUT, ) try: self._token = resp.headers["Authorization"] except KeyError: time.sleep(2**i) else: with open("token", "w") as f: f.write(self._token) return self._token raise ValueError( f"Token request failed: {resp.url} (status_code={resp.status_code})" )
[docs] def get(self, *args, select=None, **kwargs): """Performs a GET operation with the proper authorization header Most requests should use either retrieve or search instead of calling this method directly. Parameters ---------- args: Any arg accepted by request.get() select : list[str] | dict[dict], optional A container with fields to include in the returned records. Fields from other modules can be included using a dict formatted as follows: { "EMuField": None, "EMuFieldRef": { "EMuFieldInAnotherModule": None, } } kwargs: Any kwarg accepted by request.get(). By default, the headers kwarg includes {"Prefer": "representation=none", "X-HTTP-Method-Override" = "GET", "Content-Type": "application/x-www-form-urlencoded"}. The latter two keys are used to implement the HTTP method override recommended by Axiell. Returns ------- EMuAPIResponse the response returned for the request """ headers = kwargs.setdefault("headers", {}) headers["Authorization"] = f"{self._token}" headers.setdefault("Prefer", "representation=none") # Add the HTTP method override per recommendation at # https://help.emu.axiell.com/emurestapi/3.1.2/05-Appendices-Override.html headers["X-HTTP-Method-Override"] = "GET" headers["Content-Type"] = "application/x-www-form-urlencoded" # Redact authorization before logging redacted = re.sub( "'Authorization': '.*?'", "'Authorization': '[REDACTED]'", str(kwargs) ) logger.debug(f"Making GET request: {args[0]} (params={redacted})") resp = EMuAPIResponse( self.session.post(*args, **kwargs), api=self, select=select, ) if resp.status_code == 401: self.get_token(refresh=True) return self.get(*args, select=select, **kwargs) return resp
[docs] def retrieve(self, module: str, irn: str | int, select: list[str] = None) -> None: """Retrieves a single record from an irn Parameters ---------- module: str the module to query irn : str | int the IRN for the EMu record to retrieve select : list[str], optional the list of EMu fields to return. If omitted, returns the record id. Returns ------- EMuAPIResponse the query response """ # Split irn from API reference notation (emu:{server}/{module}/{irn})) if isinstance(irn, str) and irn.startswith("emu:"): irn = irn.split("/")[-1] url = self.base_url for part in [module, str(irn)]: url = urljoin(url, part).rstrip("/") + "/" params = self._prep_query(module=module, select=select) return self.get(url.rstrip("/"), data=params, select=select)
[docs] def search( self, module: str, *, select: list[str] = None, sort_: dict = None, filter_: dict = None, limit: int = 10, cursor_type: str = "server", ): """Searches EMu based on the provided filter Parameters ---------- module: str the module to query select : list[str], optional the list of EMu fields to return. If omitted, returns the record id. sort_ : dict, optional criteria by which to order the results. Each key must have the value "asc" or "desc". filter_: dict, optional search filter. Each key-value pair consists of a field name and value. Complex searches can be made using the helper functions included in this module (contains, phrase, etc.) Lists are expanded as OR searches. Values that have not been converted to the API syntax will be parsed using a set of rules modeled on EMu client searches. limit: int, default=10 the number of records to return per page cursor_type: str, default="server" whether the cursor is stored locally or on the server Yields ------ EMuAPIResponse the query response """ params = self._prep_query( module=module, select=select, sort=sort_, filter=filter_, limit=limit, cursorType=cursor_type, ) return self.get( urljoin(self.base_url, module).rstrip("/"), data=params, select=select )
def _prep_query(self, **kwargs): """Format the query for the EMu API""" params = {} if kwargs.get("select"): params["select"] = _prep_select(kwargs["select"]) if kwargs.get("sort"): params["sort"] = _prep_sort(kwargs["sort"]) if kwargs.get("filter"): params["filter"] = _prep_filter( kwargs["module"], kwargs["filter"], self.use_emu_syntax ) else: params["filter"] = {} for key in ("limit", "cursorType"): if kwargs.get(key): params[key] = kwargs[key] return params
[docs] class EMuAPIResponse: """Wraps a response from the EMu API response""" def __init__( self, response: requests.Response, api: EMuAPI, select: list[str] | dict[dict] = None, ): self.api = api self.select = select self.resolve_attachments = True self._first_only = False self._response = response self._json = None self._cached = [] def __getattr__(self, attr): try: return getattr(self._response, attr) except AttributeError: raise AttributeError( f"{repr(self.__class__.__name__)} object has no attribute {repr(attr)}" ) def __len__(self): return len(json.loads(self.headers["Next-Offsets"])) def __iter__(self): if self._cached: for rec in self._cached: yield rec else: try: rec = self.json()["data"] if self.api.parser is not None: rec = self.api.parser.parse(self.module, rec, select=self.select) elif self.resolve_attachments: # Resolving attachments individually is slow, so attachments # are deferred until a number of records have been processed # OR the user tries to access a key rec = self.defer_attachments(rec) self._cached.append(rec) yield rec except KeyError: resp = self count = 0 while True: try: # Return records in batches to make resolving attachments more # efficient records = [] for match in resp.json()["matches"]: rec = match["data"] if resp.api.parser is not None: rec = resp.api.parser.parse( self.module, rec, select=resp.select ) elif self.resolve_attachments: # Resolving attachments individually is slow, so # attachments are deferred until a number of records # have been processed OR the user tries to access a key rec = self.defer_attachments(rec) self._cached.append(rec) # Special handling when using first() to prevent iterating # through all records before returning the record if self._first_only: yield rec return records.append(rec) if len(records) >= 1000: for rec in records: count += 1 yield rec records = [] del match # delete match so that exceptions work as expected for rec in records: count += 1 yield rec except Exception as exc: try: raise ValueError( f"Could not parse match: {match} from {repr(resp.text)}" ) from exc except NameError: raise ValueError( f"No records found: {repr(resp.text)} ({resp.request.url}, {resp.params})" ) from exc else: # Get the next page if resp.api.autopage and count < resp.hits: try: resp = resp.next_page() except ValueError: break else: if hasattr(resp, "from_cache") and resp.from_cache: logger.debug("Response is from cache") else: logger.debug("Response is from server") else: break @cached_property def module(self): """The EMu module queried to create the response""" try: return self.json()["id"].split("/")[-2] except KeyError: return self.json()["matches"][0]["id"].split("/")[-2] @cached_property def params(self): """The query parameters used to make the request""" body = self.request.body if not body: return {} # Decode the request body if using requests_cache try: body = body.decode("utf-8") except AttributeError: pass params = {} for param in body.split("&"): key, val = param.split("=", 1) val = unquote_plus(val) try: val = json.loads(val) except json.JSONDecodeError: pass params[key] = val return params @cached_property def hits(self): try: return self.json()["hits"] except KeyError: return 0
[docs] def json(self): """Parse JSON from response""" if self._json is None: try: self._json = self._response.json() except json.JSONDecodeError: raise ValueError( f"Response cannot be decoded: {repr(self.text)} (status_code={self.status_code})" ) else: if "@error" in self._json: raise ValueError(f"Error: {self._json}") return self._json
[docs] def records(self): """Gets a mapping of all records in the result set by IRN Returns ------- dict dict that maps irns to records """ return {r["irn"]: r for r in self}
[docs] def first(self): """Gets the first record from the result set Returns ------- dict the first record. If a rec_class is specified, the record will use that class. """ self._first_only = True try: for rec in self: return rec finally: self._first_only = False
[docs] def next_page(self): """Gets the next pages of results in the result set Returns ------- EMuAPIResponse the result from the next page """ try: resp = self.api.get( self.url, data=self.request.body, headers={"Next-Search": self.headers["Next-Search"]}, ) except KeyError: raise ValueError("Next-Search not found in headers") return resp
[docs] def defer_attachments(self, rec): """Defers attachments in record Called automatically if resolve_attachments is True. Parameters ---------- rec : dict a record returned by the EMu API Returns ------- dict record with attachments converted to DeferredAttachments """ for key, val in rec.items(): if key.endswith(("_grp", "_subgrp")): for row in val: self.defer_attachments(row) elif is_ref(key): try: select = self.select[key] except (KeyError, TypeError): select = {} if isinstance(val, (list, tuple)): vals = [] for val in val: if _is_attachment(key, val): vals.append(attach(val, self.api, json.dumps(select))) else: vals.append(val) rec[key] = vals elif _is_attachment(key, val): rec[key] = attach(val, self.api, json.dumps(select)) elif isinstance(val, str): rec[key] = val return rec
[docs] class EMuAPIParser: """Parses responses from the EMu API""" def __init__(self): self.rec_class = dict self.api = None
[docs] def parse(self, module: str, rec: dict, select: list | dict[dict] = None): """Parses a record returned by the EMu API Only attachments mapped in the original select parameter are resolved. Parameters ---------- rec : dict a record retrieved from the EMu API module : str the backend name of the EMu module select : list | dict the fields to return Returns ------- dict the record with all attachments resolved """ parsed = _parse_api(module, rec, self.api, select=select) if self.rec_class != dict: parsed = self.rec_class(parsed, module=module) return parsed
[docs] class DeferredAttachment: """An attached record defined by a module and IRN The record itself is loaded when (1) a key is accessed or (2) it is loaded manually using the resolve() method. New instances should be created using the attach() function to allow caching. Parameters ---------- val : str the EMu attachment string api : EMuAPI the instance of the EMu API that created the parent record select : list | dict the fields to retrieve. If omitted, all fields are returned. Attributes ---------- verbatim : str the EMu attachment string module : str the backend name of the EMu module irn : int the IRN of the attached record select : list | dict the fields to retrieve """ _deferred = {} def __init__(self, val, api, select=None): self.verbatim = val self.module, self.irn = val.split("/")[-2:] self.irn = int(self.irn) self.select = select try: key = tuple(sorted(select)) except TypeError: key = select self.__class__._deferred.setdefault((self.module, key), {})[self.irn] = self self._data = None self.api = api def __str__(self): return f"DeferredAttachment({self._data if self._data else self.verbatim})" def __repr__(self): return str(self) def __int__(self): return self.irn def __getattr__(self, attr): try: return getattr(self.data, attr) except AttributeError: raise AttributeError( f"{repr(self.__class__.__name__)} object has no attribute {repr(attr)}" ) def __getitem__(self, key): return self.data[key] @property def data(self): """The EMu record for the given IRN and select statement""" if self._data is None: self.resolve() return self._data def get(self, key, default=None): return self.data.get(key, default) def items(self): return self.data.items()
[docs] def resolve(self): """Resolves all deferred records with the same IRN and select statement Returns ------- DeferredAttachment attachment with data attribute populated """ if not self._data: try: key = tuple(sorted(self.select)) except TypeError: key = self.select deferred = self.__class__._deferred.pop((self.module, key)) records = self.api.search( module=self.module, select=self.select, filter_={"irn": list(deferred)}, limit=len(deferred), ).records() # Convert IRN to integer if records have not been parsed already try: records = {int(k.split("/")[-1]): v for k, v in records.items()} except AttributeError: pass for irn, rec in deferred.items(): try: rec._data = records[irn] except KeyError: # Records where SecRecordStatus does not equal Active are not # returned correctly by the search but can still be retrieved # by IRN rec._data = self.api.retrieve( self.module, irn, select=self.select ).first() return self
[docs] @cache def attach(val, api, select=None): """Creates a DeferredAttachment for the given value This is the preferred way to create a DeferredAttachment. Parameters ---------- val : str the EMu attachment string api : EMuAPI the instance of the EMu API that created the parent record select : str a JSON-encoded string of the fields to retrieve. If omitted, all fields are returned. Returns ------- DeferredAttachment """ kwargs = {} if select: kwargs["select"] = json.loads(select) try: return DeferredAttachment(val, api, **kwargs) except AttributeError: # Some ref fields are not actually attachments if not isinstance(val, str) or not val.startswith("emu:"): return val raise
[docs] def and_(conds: list[dict]) -> dict: """Combines a list of conditions with AND Parameters ---------- conds : list[dict] list of conditions Returns ------- dict {"AND": conds} """ return {"AND": conds}
[docs] def or_(conds: list[dict]) -> dict: """Combines a list of conditions with OR Parameters ---------- conds : list[dict] list of conditions Returns ------- dict {"OR": conds} """ return {"OR": conds}
[docs] def not_(conds: dict) -> dict: """Negates a condition Parameters ---------- conds : list[dict] | dict list of conditions Returns ------- dict {"NOT": conds} """ if not isinstance(conds, (list, tuple)): conds = [conds] return {"NOT": conds}
[docs] def contains(val: str | list[str], col: str = None) -> dict: """Builds a condition to match fields containing a value Equivalent to the basic, text-only search in the EMu client. Paramters --------- val : str | list[str] the text to search for or a list of such strings col : str the name of the column. Typically ommitted. Returns ------- dict an EMu API contains condition """ return _build_multivalue_cond(val, col=col, op="contains")
[docs] def range_( gt: str | int | float = None, lt: str | int | float = None, gte: str | int | float = None, lte: str | int | float = None, mode: str = None, col: str = None, ) -> dict: """Builds a condition to match a range of values At least one of gt, lt, gte, and lte must be provided. Only one of gt and gte can be provided, and only one of lt and lte can be provided. Parameters ---------- gt: str | float | int the lower bound of the search, not inclusive lt: str | float | int the upper bound of the search, not inclusive gte: str | float | int the lower bound of the search, inclusive lte: str | float | int the upper bound of the search, inclusive mode : str one of date, time, latitude, or longitude. If omitted, will try to guess based on the column or value. col : str the name of the column. Typically ommitted. Returns ------- dict an EMu API range condition """ kwargs = {"gt": gt, "lt": lt, "gte": gte, "lte": lte} op = {k: v for k, v in kwargs.items() if v is not None} if not op: raise ValueError("Must provide at least one of gt, lt, gte, or lte") if "gt" in op and "gte" in op: raise ValueError("Can only provide one of gt and gte") if "lt" in op and "lte" in op: raise ValueError("Can only provide one of lt and lte") # Infer mode from type of data if mode is None: mode = _infer_mode(list(op.values())[0]) if mode: op["mode"] = mode return _build_cond(None, col=col, op="range", **op)
[docs] def gt(val: str | int | float, mode: str = None, col: str = None): """Builds a condition to match values greater than a given value This is a helper function based on range_(). Paramters --------- val: str | float | int the lower bound of the search, not inclusive mode : str one of date, time, latitude, or longitude. If omitted, will try to guess based on the column or value. col : str the name of the column. Typically ommitted. Returns ------- dict an EMu API range condition """ return range_(gt=val, mode=mode, col=col)
[docs] def gte(val: str | int | float, mode: str = None, col: str = None): """Builds a condition to match values greater than or equal to a given value This is a helper function based on range_(). Paramters --------- val: str | float | int the lower bound of the search, inclusive mode : str one of date, time, latitude, or longitude. If omitted, will try to guess based on the column or value. col : str the name of the column. Typically ommitted. Returns ------- dict an EMu API range condition """ return range_(gte=val, mode=mode, col=col)
[docs] def lt(val: str | int | float, mode: str = None, col: str = None): """Builds a condition to match values less than a given value This is a helper function based on range_(). Paramters --------- val: str | float | int the upper bound of the search, not inclusive mode : str one of date, time, latitude, or longitude. If omitted, will try to guess based on the column or value. col : str the name of the column. Typically ommitted. Returns ------- dict an EMu API range condition """ return range_(lt=val, mode=mode, col=col)
[docs] def lte(val: str | int | float, mode: str = None, col: str = None): """Builds a condition to match values less than or equal to a given value This is a helper function based on range_(). Paramters --------- val: str | float | int the upper bound of the search, inclusive mode : str one of date, time, latitude, or longitude. If omitted, will try to guess based on the column or value. col : str the name of the column. Typically ommitted. Returns ------- dict an EMu API range condition """ return range_(lte=val, mode=mode, col=col)
[docs] def exact(val: str | float | int, col: str = None, mode: str = None) -> dict: """Builds a condition to match the complete contents of a column exactly Equivalent to \\^\\"hello world\\"\\$ in the EMu client. Case insensitive. Paramters --------- val : str | float | int | list[str] | list[float] | list[int] the value or list of values to match mode : str one of date, time, latitude, or longitude. If omitted, will try to guess based on the column or value. Returns ------- dict an EMu API exact condition """ if mode is None: mode = _infer_mode(val) return _build_cond(val, col=col, op="exact", mode=mode)
[docs] def exists(val: bool, col: str = None) -> dict: """Builds a condition to test whether a field is populated Equivalent to \\* \\+ in the EMu client if True. Equivalent to \\!\\* or \\!\\+ if False or None. Paramters --------- val : bool whether the field is populated. True returns records where the specified field is populated, False returns records where it is empty. Returns ------- dict an EMu API exists condition """ return _build_cond(val, col=col, op="exists")
[docs] def phonetic(val: str | list[str], col: str = None) -> dict: """Builds a condition to perform a phonetic search Equivalent to \\@smythe in the EMu client. Paramters --------- val : str | list[str] the text to search for or a list of such strings Returns ------- dict an EMu API phonetic condition """ return _build_multivalue_cond(val, col=col, op="phonetic")
[docs] def phrase(val: str | list[str], col: str = None) -> dict: """Builds a condition to search for a phrase Equvalent to \\"the black cat\"" in the EMu client. Paramters --------- val : str | list[str] a multiword phrase or a list of such phrases Returns ------- dict an EMu API phrase condition """ return _build_cond(val, col=col, op="phrase")
[docs] def proximity(val: str | list[str], col: str = None, distance: int = 3) -> dict: """Builds a condition to search for words within a certain distance of each other Equivalent to \\'\\(the \\"black cat\\"\\) <= 5 words\\' in the EMu client. The client supports more complex operations (for example, searching in order) that do not appear to be supported by the API. Paramters --------- val : str | list[str] a string of two or more words or a list of such strings distance : int the maximum distance between words Returns ------- dict an EMu API phrase condition """ raise NotImplementedError("Condition does not work as expected in API or client")
# return _build_cond(val, col=col, op="proximity", distance=distance)
[docs] def regex(val: str | list[str], col: str = None) -> dict: """Builds a condition to perform a regular expression search Paramters --------- val : str | list[str] the pattern to search for Returns ------- dict an EMu API regex condition """ return _build_cond(val, col=col, op="regex")
[docs] def stemmed(val: str | list[str], col: str = None) -> dict: """Builds a condition to search for words matching the same root Equivalent to \\~locate in the EMu client Paramters --------- val : str | list[str] the root word to search for. For example, elect would match election, elected, electioneering, elects but would not match electricity Returns ------- dict an EMu API stemmed condition """ return _build_multivalue_cond(val, col=col, op="stemmed")
[docs] def is_not_null(col: str = None) -> dict: """Builds a condition that matches a non-empty field in the EMu API Alias for exists(True). Returns ------- dict an EMu API exists=True condition """ return exists(True, col=col)
[docs] def is_null(col: str = None) -> dict: """Builds a condition that matches an empty field in the EMu API Returns ------- dict an EMu API exists=False condition """ return exists(False, col=col)
[docs] def order(val: str = "asc", col: str = None) -> dict: """Builds a condition to sort in the given direction Paramters --------- val : str sort direction. Must be either "asc" or "desc". Returns ------- dict an EMu API order condition """ return _build_cond(val, col=col, op="order")
[docs] def emu_escape(val: str) -> str: """Escapes a string according to EMu escape syntax For example, the regular expression ^Hello world$ will be escaped as \\^Hello world\\$. Paramters --------- val : str the text to escape Returns ------- str the escaped text """ for item in ['"', "'", "!", "[", "]", "^", "$", "*", "+", "~", "@", "=", "=="]: val = val.replace(item, rf"\{item}") val = val.replace(r"=\=", "==") val = val.replace(r">\=", ">=") val = val.replace(r"<\=", "<=") return val
[docs] def emu_unescape(val: str) -> str: """Unescapes a string that uses the EMu escape syntax For example, the regular expression \\^Hello world\\$ will be escaped as ^Hello world$. Paramters --------- val : str the text to unescape Returns ------- str the unescaped text """ for item in ['"', "'", "!", "[", "]", "^", "$", "*", "+", "~", "@", "=", "=="]: val = val.replace(rf"\{item}", item) return val
def _infer_mode(val: Any) -> str | None: """Infers mode based on value""" classes = [ (float, None), (EMuDate, "date"), (EMuTime, "time"), (EMuLatitude, "latitude"), (EMuLongitude, "longitude"), ] for cls_, mode in classes: if isinstance(val, cls_): return mode for cls_, mode in classes: try: cls_(val) except (IndexError, TypeError, ValueError): pass else: return mode return None def _isinstance(val: Any, obj: object) -> bool: """Tests whether value or first value in an iterable is an instance of obj""" if isinstance(val, (list, tuple)): return all((isinstance(_, obj) for _ in val)) return isinstance(val, obj) def _type(val: Any) -> type: if isinstance(val, (list, tuple)): types = [type(_) for _ in val] if len(set(types)) != 1: raise ValueError(f"Object contains different types: {val}") return types[0] return type(val) def _prep_field(val: str) -> str: """Formats field names to use data.EmuField syntax""" if val == "id" or val.startswith("data."): return val return f"data.{val}" def _prep_select(select: dict | list = None) -> str: """Expands list of fields to the format used by the EMu API""" if select is None: select = [] select = list(select) if "id" not in select: select.insert(0, "id") param = ",".join([_prep_field(f) for f in select]) logger.debug(f"Prepped select as {repr(param)}") return param def _prep_sort(sort_: dict) -> str: """Expands a simple sort to the format used by the EMu API""" if isinstance(sort_, (list, tuple)): sort_ = {c: "asc" for c in sort_} conds = [] for col, val in sort_.items(): if not isinstance(val, dict): val = order(val, col=col) conds.append(val) param = json.dumps(conds) logger.debug(f"Prepped sort as {repr(param)}") return param def _is_compiled(obj, parents=None): if parents is None: parents = [] ops = {"contains", "equals", "exact", "phonetic", "range", "regex", "stemmed"} if isinstance(obj, dict): for key, vals in obj.items(): if key in {"AND", "OR", "NOT"}: parents.append("bool_op") for val in vals: result = _is_compiled(val, parents) if not result: return result parents.pop() elif parents and parents[-1] == "bool_op": if not re.match(r"data\.[A-Z][a-z]{2}[A-Z]", key): return False return bool(set(vals) & ops) else: return False else: return False return True def _prep_filter(module: str, filter_: dict, use_emu_syntax: bool = True) -> str: """Expands a simple filter to the format used by the EMu API""" if _is_compiled(filter_): param = json.dumps(filter_) logger.debug(f"Did not modify precompiled filter {repr(filter_)}") return param stmts = [] for col, val in filter_.items(): # Add column name to individual conditions if not already there if isinstance(val, dict): for key in list(val): vals = val[key] if key in ("AND", "OR"): val[key] = [{_prep_field(col): v} for v in vals] elif key == "NOT": val[key] = [_val_to_query(col, v) for v in vals] else: val[_prep_field(col)] = {key: vals} del val[key] elif isinstance(val, list): vals = [] for val in val: if col in ("AND", "OR"): vals.append(json.loads(_prep_filter(module, val))["AND"][0]) else: vals.append(_val_to_query(col, val)) val = and_(vals) if col == "AND" else or_(vals) else: # Infer operator based on data type in the schema if provided if EMuAPI.schema: data_type = EMuAPI.schema.get_field_info(module, col)["DataType"] val = _val_to_query( col, val, use_emu_syntax=use_emu_syntax, data_type=data_type ) # Otherwise base the condition on the type of data supplied elif val is None: val = exists(False, col=col) elif _isinstance(val, bool): val = exists(val, col=col) elif _isinstance(val, (float, int)): val = exact(val, col=col) else: val = _val_to_query(col, val, use_emu_syntax=use_emu_syntax) if isinstance(val, dict): val = val.get("AND", val) if not isinstance(val, (list, tuple)): val = [val] if len(val) > 1: stmts.append({"OR": or_}.get(col, and_)(val)) else: stmts.append(val[0]) # Filter must include a boolean operator even if there is only one element if len(stmts) == 1 and list(stmts[0])[0] in ("AND", "OR", "NOT"): param = json.dumps(stmts[0]) else: param = json.dumps(and_(stmts)) logger.debug(f"Prepped filter as {repr(param)}") return param def _build_cond(val: Any, op: str, col: str = None, **kwargs) -> dict: """Helper function to build a condition for the EMu API""" if col: col = _prep_field(col) # Omit empty kwargs kwargs = {k: v for k, v in kwargs.items() if v is not None} if op == "range": gt_key = None gt = None lt_key = None lt = None for key, val in kwargs.items(): if key.startswith("gt"): gt_key = key gt = val elif key.startswith("lt"): lt_key = key lt = val # Complex ranges must be lists, so coerce if needed if gt_key and lt_key and _type(gt) != _type(lt): raise ValueError(f"{gt_key} and {lt_key} must have the same type") # Simplify lists that only include one value if isinstance(gt, (list, tuple)) and len(gt) == 1: gt = gt[0] if isinstance(lt, (list, tuple)) and len(lt) == 1: lt = lt[0] if isinstance(gt, (list, tuple)) or isinstance(lt, (list, tuple)): if gt and lt: # If both gt and lt are defined, they must have the same length if isinstance(gt, (list, tuple)) and len(gt) != len(lt): raise ValueError( f"{gt_key} and {lt_key} must have the same number of items" ) vals = [] for gt, lt in zip(gt, lt): kwargs[gt_key] = gt kwargs[lt_key] = lt vals.append(_build_cond(None, op, col=col, **kwargs)) cond = or_(vals) if len(vals) > 1 else cond logger.debug(f"Built range condition: {cond}") return cond else: cond = {"range": kwargs} if col is None else {col: {"range": kwargs}} logger.debug(f"Built range condition: {cond}") return cond elif isinstance(val, (list, tuple)): if len(val) > 1: return or_([_build_cond(v, op, col=col, **kwargs) for v in val]) val = val[0] if op != "order": val = {"value": val} val.update(kwargs) cond = {op: val} if col is None else {col: {op: val}} logger.debug(f"Built {op} condition: {cond}") return cond def _build_multivalue_cond(val: Any, op: str, col: str = None): """Builds conditions for operations that should be split by word""" conds = [] for val in [val] if isinstance(val, str) else val: cond = _build_cond(val.split(" "), col=col, op=op) try: cond = {"AND": cond.pop("OR")} except KeyError: pass conds.append(cond) return or_(conds) if len(conds) > 1 else conds[0] def _val_to_query( col: str, val: str | list, use_emu_syntax: bool = True, data_type: str = None ) -> dict: """Converts a search string to a query based on EMu client Parameters ---------- col: str the EMu column name val : str | list the value to convert use_emu_syntax : bool = True whether the value uses EMu escape syntax data_type : str = None the EMu data type. Used to ensure that range searches use the correct data type. Returns ------- dict a query corresponding to the given value """ # FIXME: Implement regex # Map already defined conditions to the supplied column name if isinstance(val, dict): return {_prep_field(col): val} # Process multiple values if isinstance(val, (list, tuple)): if len(val) > 1: return or_([_val_to_query(col, v, use_emu_syntax, data_type) for v in val]) else: val = val[0] # Map booleans and None using exists if isinstance(val, bool) or val is None: return exists(bool(val), col=col) # Coerce to numeric type if data_type hint is numeric to_type = {"Float": float, "Integer": int}.get(data_type, str) # Simple numeric values can be returned with exact if not data_type and isinstance(val, (float, int)): return exact(val, col=col) elif to_type in (float, int): try: return exact(to_type(val), col=col) except ValueError: # Null searches, etc. are valid but non-numeric pass # EMuType classes map to exact for cls_, mode_ in ( (EMuDate, "date"), (EMuTime, "time"), (EMuLatitude, "latitude"), (EMuLongitude, "longitude"), ): if isinstance(val, cls_): return exact(str(val), col=col, mode=mode_) # The mode argument controls how the exact and range conditions handle comparisons mode = { "Date": "date", "Time": "time", "Latitude": "latitude", "Longitude": "longitude", }.get(data_type) # Operators that can be used in client searches ops = ["!", ">=", ">", "<=", "<"] if use_emu_syntax: ops = [emu_escape(o) for o in ops] ops = "(" + "|".join([re.escape(o) for o in ops]) + ")" conds = [] # Search for empty fields (null search) chars = ["!*", "!+"] if use_emu_syntax: chars = [emu_escape(n) for n in chars] pattern = r"(^|\b|\s)(" + "|".join([re.escape(n) for n in chars]) + r")(\b|\s|$)" if re.search(pattern, val): conds.append(exists(False, col=col)) val = re.sub(pattern, "", val).strip() if not val: return and_(conds) if len(conds) > 1 else conds[0] # Search for not chars = ["!"] if use_emu_syntax: chars = [emu_escape(n) for n in chars] pattern = r"(^|\b)(" + "|".join([re.escape(n) for n in chars]) + r")([-\w]+)" match = re.search(pattern, val) if match: conds.append(not_(_val_to_query(col, match.group(3)))) val = re.sub(pattern, "", val).strip() if not val: return and_(conds) if len(conds) > 1 else conds[0] # Search for populated fields chars = ["*", "+"] if use_emu_syntax: chars = [emu_escape(n) for n in chars] pattern = r"(^|\b|\s)(" + "|".join([re.escape(n) for n in chars]) + r")(\b|\s|$)" if re.search(pattern, val): conds.append(exists(True, col=col)) val = re.sub(pattern, "", val).strip() if not val: return and_(conds) if len(conds) > 1 else conds[0] # Search by stem chars = ["~"] if use_emu_syntax: chars = [emu_escape(n) for n in chars] pattern = r"(^|\b|\s)(" + "|".join([re.escape(n) for n in chars]) + r")([-\w]+)" match = re.search(pattern, val) if match: conds.append(stemmed(match.group(3), col=col)) val = re.sub(pattern, "", val).strip() if not val: return and_(conds) if len(conds) > 1 else conds[0] # Search phonetically chars = ["@"] if use_emu_syntax: chars = [emu_escape(n) for n in chars] pattern = r"(^|\b|\s)(" + "|".join([re.escape(n) for n in chars]) + r")([-\w]+)" match = re.search(pattern, val) if match: conds.append(phonetic(match.group(3), col=col)) val = re.sub(pattern, "", val).strip() if not val: return and_(conds) if len(conds) > 1 else conds[0] # Search case- and diacritic-sensitively chars = ["=", "=="] if use_emu_syntax: chars = [emu_escape(n) for n in chars] pattern = r"(^|\b|\s)(" + "|".join([re.escape(n) for n in chars]) + r")([-\w]+)" if re.search(pattern, val): raise ValueError( "Case- and diacritic-sensitive searches are not supported by the API" ) # Search for an exact word or phrase. Most numbers are handled in the Words and # Numbers section below, although this pattern should catch phrases containing # number, e.g., "Site 123". if use_emu_syntax: pattern = r'\\\^([^\d\W]+|\\"[^"]+\\")\\\$' else: pattern = r'\^([^\d\W]+|"[^"]+")\$' match = re.match(pattern + "$", val) if match: conds.append(exact(match.group(1).strip('\\"'), col=col)) val = re.sub(pattern, "", val).strip() # Phrases if use_emu_syntax: pattern = rf"{ops}?(\\'(?:.*?)\\'|\\\"(?:.*?)\\\")" else: pattern = rf"{ops}?('(?:.*?)'|\"(?:.*?)\")" for op, val_ in re.findall(pattern, val): cond = phrase(val_.strip("\"'\\"), col=col) conds.append(cond if op.lstrip("\\") != "!" else not_(cond)) val = re.sub(pattern, "", val).strip() if not val: return and_(conds) if len(conds) > 1 else conds[0] # Words and numbers pattern = f"{ops}?(.*)" ranges = {} for val in re.split(f" +", val): op, val = re.findall(pattern, val)[0] op = op.lstrip("\\") if "<" in op or ">" in op: ranges[op] = val else: if to_type != str or mode: cond = exact(to_type(val), col=col, mode=mode) else: cond = contains(to_type(val), col=col) conds.append(cond if op.lstrip("\\") != "!" else not_(cond)) # Ranges if ranges: mapping = {">=": "gte", ">": "gt", "<=": "lte", "<": "lt"} kwargs = {mapping[k]: to_type(v) for k, v in ranges.items()} kwargs["mode"] = mode conds.append(range_(col=col, **kwargs)) return and_(conds) if len(conds) > 1 else conds[0] def _is_attachment(key, val): """Tests if key-value pair is an attachment""" return bool( is_ref(key) and isinstance(val, str) and (val.isnumeric() or re.match(r"emu:/[a-z]+/[a-z]+/\d+$", val)) ) def _parse_api(module: str, val: dict, api: EMuAPI, select=None, key=None, mapped=None): """Parses API response to remove field groupings""" if mapped is None: mapped = {} if key and not key.endswith(("_grp", "_subgrp")): key = EMuAPI.schema.map_short_name(module, key) try: select = select[key] except (KeyError, TypeError): pass # Iterate dicts if isinstance(val, dict): for key, val in val.items(): _parse_api(module, val, api, select=select, key=key, mapped=mapped) # Map tables. Groups are based on definitions in the schema. elif key.endswith("_grp"): keys = [] for row in val: keys.extend(row) keys = set(keys) grid = {} for row in val: for key in keys: grid.setdefault(key, []).append(row.get(key)) for key, vals in grid.items(): if any(vals): _parse_api(module, vals, api, select=select, key=key, mapped=mapped) # Map nested tables elif key.endswith("_subgrp"): keys = [] for row in val: if row: for inner_row in row: keys.extend(inner_row) keys = set(keys) grid = {k: [] for k in keys} for row in val: for val in grid.values(): val.append([]) if row: for inner_row in row: for key in keys: grid[key][-1].append(inner_row.get(key)) for key, vals in grid.items(): if any(vals): _parse_api(module, vals, api, select=select, key=key, mapped=mapped) # Simplify IRNs. Note that multimedia references use Ref fields and IRN-like text. # These are handled by the emu prefix check. elif val and is_ref(key): if isinstance(val, (list, tuple)): vals = [] for val in val: if _is_attachment(key, val): vals.append(attach(val, api, json.dumps(select))) else: vals.append(val) mapped[key] = vals elif _is_attachment(key, val): mapped[key] = attach(val, api, json.dumps(select)) elif isinstance(val, str): mapped[key] = val elif key == "irn" and not isinstance(val, int): mapped[key] = int(val.split("/")[-1]) else: mapped[key] = val return mapped