"""
Asynchronous Python Client for generic Biothings API services
"""
import asyncio
import logging
import platform
import warnings
from copy import copy
from pathlib import Path
from typing import Any, AsyncGenerator, Awaitable, Callable, Dict, Iterable, List, Optional, Tuple, Type, Union, cast
import httpx
from biothings_client.__version__ import __version__
from biothings_client._dependencies import _CACHING, _CACHING_NOT_SUPPORTED, _PANDAS
from biothings_client.cache.httpx.transport import ForcedCacheAsyncTransport
from biothings_client.client.exceptions import CachingNotSupportedError, OptionalDependencyImportError
from biothings_client.client.settings import (
COMMON_ALIASES,
COMMON_KWARGS,
MYCHEM_ALIASES,
MYCHEM_KWARGS,
MYDISEASE_ALIASES,
MYDISEASE_KWARGS,
MYGENE_ALIASES,
MYGENE_KWARGS,
MYGENESET_ALIASES,
MYGENESET_KWARGS,
MYTAXON_ALIASES,
MYTAXON_KWARGS,
MYVARIANT_ALIASES,
MYVARIANT_KWARGS,
ClientClassKwargs,
ClientSettings,
)
from biothings_client.mixins.gene import MyGeneClientMixin
from biothings_client.mixins.variant import MyVariantClientMixin
from biothings_client.utils.copy import copy_func
from biothings_client.utils.iteration import concatenate_list, iter_n, list_itemcnt
if _PANDAS:
import pandas
else:
pandas: Any = None # type: ignore[no-redef]
if _CACHING:
import hishel # type: ignore
import hishel.httpx # type: ignore
from biothings_client.cache.storage.sqlite3 import BiothingsClientAsyncSqliteStorage
# IMPORTANT
# In order to cache our POST requests we have to override hishel's
# speceficiation for SAFE_METHODS
# If we don't, then the IdleClient state will immediately force a CacheMiss
# and we won't actually leverage a cached POST request, even if our policy
# supports POST requests
# >>> SAFE_METHODS = frozenset({'TRACE', 'HEAD', 'GET', 'OPTIONS'})
# We also have to create a new instance due to the frozenset usage
OVERRIDE_SAFE_METHODS = frozenset({"TRACE", "HEAD", "GET", "OPTIONS", "POST"})
hishel._core._spec.SAFE_METHODS = OVERRIDE_SAFE_METHODS # type: ignore[attr-defined]
else:
hishel: Any = None # type: ignore[no-redef]
BiothingsClientAsyncSqliteStorage: Any = None # type: ignore[no-redef]
logger = logging.getLogger("biothings.client")
logger.setLevel(logging.INFO)
JsonDict = Dict[str, Any]
JsonList = List[JsonDict]
ResponsePayload = Any
PROXY_MOUNT = Dict[str, Optional[httpx.AsyncHTTPTransport]]
# Future work:
# Consider use "verbose" settings to control default logging output level
# by doing this instead of using branching throughout the application,
# the business logic can be more concise and more readable.
class AsyncBiothingClient:
"""
async http client class for accessing the biothings web services
"""
_annotation_endpoint: str
_default_cache_file: str
_default_url: str
_delay: Union[int, float]
_docstring_obj: Dict[str, str]
_entity: str
_max_query: int
_metadata_endpoint: str
_metadata_fields_endpoint: str
_optionally_plural_object_type: str
_pkg_user_agent_header: str
_query_endpoint: str
_scroll_size: int
_step: int
_top_level_jsonld_uris: List[str]
clear_cache: Callable[..., Awaitable[None]]
delete_cache: Callable[..., Awaitable[None]]
set_caching: Callable[..., Awaitable[None]]
stop_caching: Callable[..., Awaitable[None]]
def __init__(self, url: Optional[str] = None) -> None:
if url is None:
url = self._default_url
self.url: str = url
if self.url[-1] == "/":
self.url = self.url[:-1]
self.max_query: int = self._max_query
# delay and step attributes are for batch queries.
self.delay: Union[int, float] = self._delay # delay is ignored when requests made from cache.
self.step: int = self._step
self.scroll_size: int = self._scroll_size
# raise httpx.HTTPError for status_code > 400
# > but not for 404 on getvariant
# > set to False to suppress the exceptions.
self.raise_for_status: bool = True
self.default_user_agent = (
"{package_header}/{client_version} (" "python:{python_version} " "httpx:{httpx_version}" ")"
).format(
**{
"package_header": self._pkg_user_agent_header,
"client_version": __version__,
"python_version": platform.python_version(),
"httpx_version": httpx.__version__,
}
)
self.http_client: Optional[httpx.AsyncClient] = None
self.http_client_setup: bool = False
self.http_cache_client_setup: bool = False
self.cache_storage: Any = None
self.caching_enabled: bool = False
async def _set_http_client(self, cache_db: Optional[Union[str, Path]] = None) -> None:
"""Setter for determining what http client we build based on if caching is enabled."""
if self.caching_enabled:
await self._build_cache_http_client(cache_db)
else:
await self._build_http_client()
async def _build_http_client(self) -> None:
"""
Builds the async client instance for usage through the lifetime
of the biothings_client
This modifies the state of the BiothingsClient instance
to set the values for the http_client property
For the moment, we have disabled timeouts. This matches our prior
behavior we had with the requests library, which by default did not
specify a timeout when making a request. In the future this should
be modified to prevent indefinite hanging with potentially bad network
connections
"""
if not self.http_client_setup:
self.http_client = httpx.AsyncClient(timeout=None)
self.http_client_setup = True
self.http_cache_client_setup = False
async def _build_cache_http_client(self, cache_db: Optional[Union[str, Path]] = None) -> None:
"""
Builds the client instance used for caching biothings requests.
We rebuild the client whenever we enable to caching to ensure
that we don't create a database files unless the user explicitly
wants to leverage request caching
This modifies the state of the BiothingsClient instance
to set the values for the http_client property and the cache_storage property
For the moment, we have disabled timeouts. This matches our prior
behavior we had with the requests library, which by default did not
specify a timeout when making a request. In the future this should
be modified to prevent indefinite hanging with potentially bad network
connections
Inputs:
:param cache_db: pathlike object to the local sqlite3 cache database file
Outputs:
:return: None
"""
if not self.http_cache_client_setup:
assert hishel is not None # noqa: S101
assert BiothingsClientAsyncSqliteStorage is not None # noqa: S101
if cache_db is None:
cache_db = self._default_cache_file
assert cache_db is not None # noqa: S101
cache_db = Path(cache_db).resolve().absolute()
self.cache_storage = BiothingsClientAsyncSqliteStorage(database_path=cache_db)
# We have to apply the SpecificationPolicy for both the SyncCacheTransport
# and the SyncCacheClient
cache_options = hishel.CacheOptions(
supported_methods=["GET", "HEAD", "POST"],
shared=False,
allow_stale=False,
)
cache_policy = hishel.SpecificationPolicy(cache_options=cache_options)
http_transport = ForcedCacheAsyncTransport()
cache_transport = hishel.httpx.AsyncCacheTransport(
next_transport=http_transport,
storage=self.cache_storage,
policy=cache_policy,
)
# Have to manually build the proxy mounts as httpx will not auto-discover
# proxies if we provide our own HTTPTransport to the Client constructor
proxy_mounts = self._build_caching_proxy_mounts()
self.http_client = hishel.httpx.AsyncCacheClient(
policy=cache_policy,
transport=cache_transport,
storage=self.cache_storage,
mounts=proxy_mounts,
timeout=None,
)
self.http_client_setup = False
self.http_cache_client_setup = True
def _build_caching_proxy_mounts(self) -> PROXY_MOUNT:
"""
Builds the proxy mounts for case where we have a CacheTransport.
Autodiscovery of proxies only works when don't provide a transport
to the client so this method acts as a replacement for that
"""
proxy_map = httpx._utils.get_environment_proxies() # type: ignore[attr-defined]
proxy_mounts: PROXY_MOUNT = {}
for key, proxy in proxy_map.items():
proxy_transport = None
if proxy is not None:
proxy_transport = httpx.AsyncHTTPTransport(
verify=True,
cert=None,
trust_env=True,
http1=True,
http2=False,
limits=httpx._config.DEFAULT_LIMITS,
proxy=proxy,
)
proxy_mounts[key] = proxy_transport
proxy_mounts = dict(sorted(proxy_mounts.items()))
return proxy_mounts
def use_http(self) -> None:
if self.url:
self.url = self.url.replace("https://", "http://")
def use_https(self) -> None:
if self.url:
self.url = self.url.replace("http://", "https://")
@staticmethod
async def _dataframe(obj: Union[JsonDict, JsonList], dataframe: int, df_index: bool = True) -> Any:
"""
Converts object to DataFrame (pandas)
"""
if _PANDAS:
assert pandas is not None # noqa: S101
if dataframe not in [1, 2]:
raise ValueError("dataframe must be either 1 (using json_normalize) or 2 (using DataFrame.from_dict")
if isinstance(obj, dict) and "hits" in obj:
if dataframe == 1:
df = pandas.json_normalize(obj["hits"])
else:
df = pandas.DataFrame.from_dict(obj)
else:
if dataframe == 1:
df = pandas.json_normalize(obj)
else:
df = pandas.DataFrame.from_dict(cast(Dict[Any, Any], obj))
if df_index:
df = df.set_index("query")
return df
else:
dataframe_library_error = OptionalDependencyImportError(
optional_function_access="enable dataframe conversion",
optional_group="dataframe",
libraries=["pandas"],
)
raise dataframe_library_error
async def _get(
self,
url: str,
params: Optional[JsonDict] = None,
none_on_404: bool = False,
verbose: bool = True,
) -> Tuple[bool, ResponsePayload]:
"""
Wrapper around the httpx.get method
"""
await self._set_http_client()
assert self.http_client is not None # noqa: S101
if params is None:
params = {}
debug = params.pop("debug", False)
return_raw = params.pop("return_raw", False)
headers = {"user-agent": self.default_user_agent}
response = await self.http_client.get(
url=url,
params=params,
headers=headers,
extensions={"cache_disabled": not self.caching_enabled},
)
response_extensions = response.extensions
from_cache = response_extensions.get("hishel_from_cache", False)
if from_cache:
logger.debug("Cached response %s from %s", response, url)
get_response: Tuple[bool, ResponsePayload] = (from_cache, response)
if response.is_success:
if debug or return_raw:
get_response = (from_cache, response)
else:
get_response = (from_cache, response.json())
else:
if none_on_404 and response.status_code == 404:
get_response = (from_cache, None)
elif self.raise_for_status:
response.raise_for_status() # raise httpx._exceptions.HTTPStatusError
return get_response
async def _post(self, url: str, params: Optional[JsonDict], verbose: bool = True) -> Tuple[bool, ResponsePayload]:
"""
Wrapper around the httpx.post method
"""
await self._set_http_client()
assert self.http_client is not None # noqa: S101
if params is None:
params = {}
return_raw = params.pop("return_raw", False)
headers = {"user-agent": self.default_user_agent}
response = await self.http_client.post(
url=url,
data=params,
headers=headers,
extensions={"cache_disabled": not self.caching_enabled},
)
response_extensions = response.extensions
from_cache = response_extensions.get("hishel_from_cache", False)
if from_cache:
logger.debug("Cached response %s from %s", response, url)
post_response: Tuple[bool, ResponsePayload] = (from_cache, response)
if response.is_success:
if return_raw:
post_response = (from_cache, response)
else:
response.read()
post_response = (from_cache, response.json())
else:
if self.raise_for_status:
response.raise_for_status()
else:
post_response = (from_cache, response)
return post_response
async def _handle_common_kwargs(self, kwargs: JsonDict) -> JsonDict:
# handle these common parameters accept field names as the value
for kw in ["fields", "always_list", "allow_null"]:
if kw in kwargs:
kwargs[kw] = concatenate_list(kwargs[kw], quoted=False)
return kwargs
async def _repeated_query(
self,
query_fn: Callable[..., Awaitable[Tuple[bool, Any]]],
query_li: Iterable[Any],
verbose: bool = True,
**fn_kwargs: Any,
) -> AsyncGenerator[Any, None]:
"""
Run query_fn for input query_li in a batch (self.step).
return a generator of query_result in each batch.
input query_li can be a list/tuple/iterable
"""
step = min(self.step, self.max_query)
i = 0
for batch, cnt in iter_n(query_li, step, with_cnt=True):
if verbose:
logger.info("querying {0}-{1}...".format(i + 1, cnt))
i = cnt
from_cache, query_result = await query_fn(batch, **fn_kwargs)
yield query_result
if not from_cache and self.delay:
await asyncio.sleep(self.delay)
async def _metadata(self, verbose: bool = True, **kwargs: Any) -> JsonDict:
"""
Return a dictionary of Biothing metadata.
"""
_url = self.url + self._metadata_endpoint
_, ret = await self._get(_url, params=kwargs, verbose=verbose)
return ret
async def _set_caching(self, cache_db: Optional[Union[str, Path]] = None, **kwargs: Any) -> None:
"""
Enable the client caching and creates a local cache database
for all future requests
If caching is already enabled then we no-opt
Inputs:
:param cache_db: pathlike object to the local sqlite3 cache database file
Outputs:
:return: None
"""
if _CACHING_NOT_SUPPORTED:
raise CachingNotSupportedError("Caching is only supported for Python 3.8+")
if _CACHING:
if not self.caching_enabled:
try:
self.caching_enabled = True
self.http_client_setup = False
await self._build_cache_http_client()
logger.debug("Reset the HTTP client to leverage caching %s", self.http_client)
logger.info(
(
"Enabled client caching: %s\nFuture queries will be cached in [%s]",
self,
self.cache_storage.database_path,
)
)
except Exception as gen_exc:
logger.exception(gen_exc)
logger.error("Unable to enable caching")
raise gen_exc
else:
logger.warning("Caching already enabled. Skipping for now ...")
else:
caching_library_error = OptionalDependencyImportError(
optional_function_access="enable biothings-client caching",
optional_group="caching",
libraries=["anysqlite", "hishel"],
)
raise caching_library_error
async def _stop_caching(self) -> None:
"""
Disable client caching. The local cache database will be maintained,
but we will disable cache access when sending requests
If caching is already disabled then we no-opt
Inputs:
:param None
Outputs:
:return: None
"""
if _CACHING_NOT_SUPPORTED:
raise CachingNotSupportedError("Caching is only supported for Python 3.8+")
if _CACHING:
if self.caching_enabled:
self.caching_enabled = False
self.http_client_setup = False
await self._build_http_client()
logger.debug("Reset the HTTP client to disable caching %s", self.http_client)
logger.info("Disabled client caching: %s", self)
else:
logger.warning("Caching already disabled. Skipping for now ...")
else:
caching_library_error = OptionalDependencyImportError(
optional_function_access="disable biothings-client caching",
optional_group="caching",
libraries=["anysqlite", "hishel"],
)
raise caching_library_error
async def _clear_cache(self) -> None:
"""
Clear the globally installed cache. Caching will stil be enabled,
but the data stored in the cache stored will be dropped
Inputs:
:param None
Outputs:
:return: None
"""
if _CACHING_NOT_SUPPORTED:
raise CachingNotSupportedError("Caching is only supported for Python 3.8+")
if _CACHING:
if self.caching_enabled:
try:
await self.cache_storage.hard_cleanup()
except Exception as gen_exc:
logger.exception(gen_exc)
logger.error("Error attempting to clear the local cache database")
raise gen_exc
else:
logger.warning("Caching already disabled. No local cache database to clear. Skipping for now ...")
else:
caching_library_error = OptionalDependencyImportError(
optional_function_access="clear biothings-client cache",
optional_group="caching",
libraries=["anysqlite", "hishel"],
)
raise caching_library_error
async def _delete_cache(self) -> None:
"""
Disable caching, close the storage connection, and delete the local cache database file.
If caching is not currently active but a cache file was previously created,
the file will still be removed.
Inputs:
:param None
Outputs:
:return: None
"""
if _CACHING_NOT_SUPPORTED:
raise CachingNotSupportedError("Caching is only supported for Python 3.8+")
if _CACHING:
if self.cache_storage is not None:
cache_db = self.cache_storage.database_path
if self.caching_enabled:
await self._stop_caching()
await self.cache_storage.close()
self.cache_storage = None
cache_db.unlink(missing_ok=True)
logger.info("Deleted cache file: %s", cache_db)
else:
logger.warning("No cache storage found. Skipping delete ...")
else:
caching_library_error = OptionalDependencyImportError(
optional_function_access="delete biothings-client cache",
optional_group="caching",
libraries=["anysqlite", "hishel"],
)
raise caching_library_error
async def _get_fields(self, search_term: Optional[str] = None, verbose: bool = True) -> JsonDict:
"""
Wrapper for /metadata/fields
**search_term** is a case insensitive string to search for in available field names.
If not provided, all available fields will be returned.
.. Hint:: This is useful to find out the field names you need to pass to **fields** parameter of other methods.
"""
_url = self.url + self._metadata_fields_endpoint
if search_term:
params = {"search": search_term}
else:
params = {}
_, ret = await self._get(_url, params=params, verbose=verbose)
for k, v in ret.items():
del k
# Get rid of the notes column information
if "notes" in v:
del v["notes"]
return ret
async def _getannotation(
self,
_id: Any,
fields: Optional[Union[str, Iterable[Any]]] = None,
**kwargs: Any,
) -> Any:
"""
Return the object given id.
This is a wrapper for GET query of the biothings annotation service.
:param _id: an entity id.
:param fields: fields to return, a list or a comma-separated string.
If not provided or **fields="all"**, all available fields
are returned.
:return: an entity object as a dictionary, or None if _id is not found.
"""
verbose = kwargs.pop("verbose", True)
if fields:
kwargs["fields"] = fields
kwargs = await self._handle_common_kwargs(kwargs)
_url = self.url + self._annotation_endpoint + str(_id)
_, ret = await self._get(_url, kwargs, none_on_404=True, verbose=verbose)
return ret
async def _getannotations_inner(
self, ids: Iterable[Any], verbose: bool = True, **kwargs: Any
) -> Tuple[bool, ResponsePayload]:
id_collection = concatenate_list(ids)
_kwargs = {"ids": id_collection}
_kwargs.update(kwargs)
_url = self.url + self._annotation_endpoint
return await self._post(_url, _kwargs, verbose=verbose)
async def _annotations_generator(
self,
query_fn: Callable[..., Awaitable[Tuple[bool, Iterable[JsonDict]]]],
ids: Iterable[Any],
verbose: bool = True,
**kwargs: Any,
) -> AsyncGenerator[JsonDict, None]:
"""
Function to yield a batch of hits one at a time.
"""
async for hits in self._repeated_query(query_fn, ids, verbose=verbose):
for hit in hits:
yield hit
async def _getannotations(
self,
ids: Union[str, Iterable[Any]],
fields: Optional[Union[str, Iterable[Any]]] = None,
**kwargs: Any,
) -> Any:
"""
Return the list of annotation objects for the given list of ids.
This is a wrapper for POST query of the biothings annotation service.
:param ids: a list/tuple/iterable or a string of ids.
:param fields: fields to return, a list or a comma-separated string.
If not provided or **fields="all"**, all available fields
are returned.
:param as_generator: if True, will yield the results in a generator.
:param as_dataframe: if True or 1 or 2, return object as DataFrame (requires Pandas).
True or 1: using json_normalize
2 : using DataFrame.from_dict
otherwise: return original json
:param df_index: if True (default), index returned DataFrame by 'query',
otherwise, index by number. Only applicable if as_dataframe=True.
:return: a list of objects or a pandas DataFrame object (when **as_dataframe** is True)
.. Hint:: A large list of more than 1000 input ids will be sent to the backend
web service in batches (1000 at a time), and then the results will be
concatenated together. So, from the user-end, it's exactly the same as
passing a shorter list. You don't need to worry about saturating our
backend servers.
.. Hint:: If you need to pass a very large list of input ids, you can pass a generator
instead of a full list, which is more memory efficient.
"""
if isinstance(ids, str):
ids = ids.split(",") if ids else []
if not (isinstance(ids, (list, tuple, Iterable))):
raise ValueError('input "ids" must be a list, tuple or iterable.')
if fields:
kwargs["fields"] = fields
kwargs = await self._handle_common_kwargs(kwargs)
verbose = kwargs.pop("verbose", True)
dataframe = kwargs.pop("as_dataframe", None)
df_index = kwargs.pop("df_index", True)
generator = kwargs.pop("as_generator", False)
if dataframe in [True, 1]:
dataframe = 1
elif dataframe != 2:
dataframe = None
return_raw = kwargs.get("return_raw", False)
if return_raw:
dataframe = None
async def query_fn(ids: Iterable[Any]) -> Tuple[bool, ResponsePayload]:
return await self._getannotations_inner(ids, verbose=verbose, **kwargs)
if generator:
return self._annotations_generator(query_fn, ids, verbose=verbose, **kwargs)
out = []
async for hits in self._repeated_query(query_fn, ids, verbose=verbose):
if return_raw:
out.append(hits) # hits is the raw response text
else:
out.extend(hits)
if return_raw and len(out) == 1:
out = out[0]
if dataframe:
out = await self._dataframe(out, dataframe, df_index=df_index)
return out
async def _query(self, q: str, **kwargs: Any) -> Any:
"""
Return the query result.
This is a wrapper for GET query of biothings query service.
:param q: a query string.
:param fields: fields to return, a list or a comma-separated string.
If not provided or **fields="all"**, all available fields
are returned.
:param size: the maximum number of results to return (with a cap
of 1000 at the moment). Default: 10.
:param skip: the number of results to skip. Default: 0.
:param sort: Prefix with "-" for descending order, otherwise in ascending order.
Default: sort by matching scores in decending order.
:param as_dataframe: if True or 1 or 2, return object as DataFrame (requires Pandas).
True or 1: using json_normalize
2 : using DataFrame.from_dict
otherwise: return original json
:param fetch_all: if True, return a generator to all query results (unsorted). This can provide a very fast
return of all hits from a large query.
Server requests are done in blocks of 1000 and yielded individually. Each 1000 block of
results must be yielded within 1 minute, otherwise the request will expire at server side.
:return: a dictionary with returned variant hits or a pandas DataFrame object (when **as_dataframe** is True)
or a generator of all hits (when **fetch_all** is True)
.. Hint:: By default, **query** method returns the first 10 hits if the matched hits are >10.
If the total number of hits are less than 1000, you can increase the value for
**size** parameter. For a query that returns more than 1000 hits, you can pass
"fetch_all=True" to return a `generator <http://www.learnpython.org/en/Generators>`_
of all matching hits (internally, those hits are requested from the server in blocks of 1000).
"""
_url = self.url + self._query_endpoint
verbose = kwargs.pop("verbose", True)
kwargs = await self._handle_common_kwargs(kwargs)
kwargs.update({"q": q})
fetch_all = kwargs.get("fetch_all")
if fetch_all in [True, 1]:
if kwargs.get("as_dataframe", None) in [True, 1]:
warnings.warn(
"Ignored 'as_dataframe' because 'fetch_all' is specified. "
"Too many documents to return as a Dataframe."
)
return self._fetch_all(url=_url, verbose=verbose, **kwargs)
dataframe = kwargs.pop("as_dataframe", None)
if dataframe in [True, 1]:
dataframe = 1
elif dataframe != 2:
dataframe = None
_, out = await self._get(_url, kwargs, verbose=verbose)
if dataframe:
out = await self._dataframe(out, dataframe, df_index=False)
return out
async def _fetch_all(self, url: str, verbose: bool = True, **kwargs: Any) -> AsyncGenerator[JsonDict, None]:
"""
Function that returns a generator to results. Assumes that 'q' is in kwargs.
Implicitly disables caching to ensure we actually hit the endpoint rather than
pulling from local cache
"""
logger.warning("fetch_all implicitly disables HTTP request caching")
restore_caching = False
if self.caching_enabled:
restore_caching = True
try:
await self._stop_caching()
except OptionalDependencyImportError as optional_import_error:
logger.exception(optional_import_error)
logger.debug("No cache to disable for fetch all. Continuing ...")
except Exception as gen_exc:
logger.exception(gen_exc)
logger.error("Unknown error occured while attempting to disable caching")
raise gen_exc
try:
_, response = await self._get(url, params=kwargs, verbose=verbose)
if verbose:
logger.info("Fetching {0} {1} . . .".format(response["total"], self._optionally_plural_object_type))
for key in ["q", "fetch_all"]:
kwargs.pop(key)
while not response.get("error", "").startswith("No results to return"):
if "error" in response:
logger.error(response["error"])
break
if "_warning" in response and verbose:
logger.warning(response["_warning"])
for hit in response["hits"]:
yield hit
kwargs.update({"scroll_id": response["_scroll_id"]})
_, response = await self._get(url, params=kwargs, verbose=verbose)
except Exception as gen_exc:
logger.exception(gen_exc)
raise gen_exc
finally:
if restore_caching:
logger.debug("re-enabling the client HTTP caching")
try:
await self._set_caching()
except OptionalDependencyImportError as optional_import_error:
logger.exception(optional_import_error)
logger.debug("No cache to disable for fetch all. Continuing ...")
except Exception as gen_exc:
logger.exception(gen_exc)
logger.error("Unknown error occured while attempting to disable caching")
raise gen_exc
async def _querymany_inner(
self, qterms: Iterable[Any], verbose: bool = True, **kwargs: Any
) -> Tuple[bool, ResponsePayload]:
query_term_collection = concatenate_list(qterms)
_kwargs = {"q": query_term_collection}
_kwargs.update(kwargs)
_url = self.url + self._query_endpoint
return await self._post(_url, params=_kwargs, verbose=verbose)
async def _querymany( # noqa: MC0001
self,
qterms: Union[str, Iterable[Any]],
scopes: Optional[Union[str, Iterable[Any]]] = None,
**kwargs: Any,
) -> Any:
"""
Return the batch query result.
This is a wrapper for POST query of "/query" service.
:param qterms: a list/tuple/iterable of query terms, or a string of comma-separated query terms.
:param scopes: specify the type (or types) of identifiers passed to **qterms**,
either a list or a comma-separated fields to specify type of input qterms.
:param fields: fields to return, a list or a comma-separated string.
If not provided or **fields="all"**, all available fields
are returned.
:param returnall: if True, return a dict of all related data, including dup. and missing qterms
:param verbose: if True (default), print out information about dup and missing qterms
:param as_dataframe: if True or 1 or 2, return object as DataFrame (requires Pandas).
True or 1: using json_normalize
2 : using DataFrame.from_dict
otherwise: return original json
:param df_index: if True (default), index returned DataFrame by 'query',
otherwise, index by number. Only applicable if as_dataframe=True.
:return: a list of matching objects or a pandas DataFrame object.
.. Hint:: Passing a large list of ids (>1000) to :py:meth:`querymany` is perfectly fine.
.. Hint:: If you need to pass a very large list of input qterms, you can pass a generator
instead of a full list, which is more memory efficient.
"""
if isinstance(qterms, str):
qterms = qterms.split(",") if qterms else []
if not (isinstance(qterms, (list, tuple, Iterable))):
raise ValueError('input "qterms" must be a list, tuple or iterable.')
if scopes:
kwargs["scopes"] = concatenate_list(scopes, quoted=False)
kwargs = await self._handle_common_kwargs(kwargs)
returnall = kwargs.pop("returnall", False)
verbose = kwargs.pop("verbose", True)
dataframe = kwargs.pop("as_dataframe", None)
if dataframe in [True, 1]:
dataframe = 1
elif dataframe != 2:
dataframe = None
df_index = kwargs.pop("df_index", True)
return_raw = kwargs.get("return_raw", False)
if return_raw:
dataframe = None
out = []
li_missing = []
li_dup = []
li_query = []
async def query_fn(qterms: Iterable[Any]) -> Tuple[bool, ResponsePayload]:
return await self._querymany_inner(qterms, verbose=verbose, **kwargs)
async for hits in self._repeated_query(query_fn, qterms, verbose=verbose):
if return_raw:
out.append(hits) # hits is the raw response text
else:
out.extend(hits)
for hit in hits:
if hit.get("notfound", False):
li_missing.append(hit["query"])
else:
li_query.append(hit["query"])
if verbose:
logger.info("Finished.")
if return_raw:
if len(out) == 1:
out = out[0]
return out
# check dup hits
if li_query:
li_dup = [(query, cnt) for query, cnt in list_itemcnt(li_query) if cnt > 1]
del li_query
li_dup_df = None
li_missing_df = None
if dataframe:
assert pandas is not None # noqa: S101
out = await self._dataframe(out, dataframe, df_index=df_index)
li_dup_df = pandas.DataFrame.from_records(li_dup, columns=["query", "duplicate hits"])
li_missing_df = pandas.DataFrame(li_missing, columns=["query"])
if verbose:
if li_dup:
logger.warning("{0} input query terms found dup hits:".format(len(li_dup)) + "\t" + str(li_dup)[:100])
if li_missing:
logger.warning(
"{0} input query terms found no hit:".format(len(li_missing)) + "\t" + str(li_missing)[:100]
)
if returnall:
if dataframe:
return {"out": out, "dup": li_dup_df, "missing": li_missing_df}
else:
return {"out": out, "dup": li_dup, "missing": li_missing}
else:
if verbose and (li_dup or li_missing):
logger.info('Pass "returnall=True" to return complete lists of duplicate or missing query terms.')
return out
[docs]
def get_async_client(
biothing_type: Optional[str] = None,
instance: bool = True,
*args: Any,
**kwargs: Any,
) -> Union[AsyncBiothingClient, Type[AsyncBiothingClient]]:
"""
Function to return a new python asynchronous client for a Biothings API service.
:param biothing_type: the type of biothing client, currently one of: 'gene', 'variant', 'taxon', 'chem', 'disease', 'geneset'
:param instance: if True, return an instance of the derived client,
if False, return the class of the derived client
All other args/kwargs are passed to the derived client instantiation (if applicable)
"""
if not biothing_type:
url = kwargs.get("url", False)
if not url:
raise RuntimeError("No biothings_type or url specified.")
try:
url += "metadata" if url.endswith("/") else "/metadata"
res = httpx.get(url)
dic = res.json()
biothing_type = dic.get("biothing_type")
if isinstance(biothing_type, list):
if len(biothing_type) == 1:
# if a list with only one item, use that item
biothing_type = biothing_type[0]
else:
raise RuntimeError("Biothing_type in metadata url is not unique.")
if not isinstance(biothing_type, str):
raise RuntimeError("Biothing_type in metadata url is not a valid string.")
except httpx.RequestError as request_error:
raise RuntimeError("Cannot access metadata url to determine biothing_type.") from request_error
else:
biothing_type = biothing_type.lower()
if biothing_type not in ASYNC_CLIENT_SETTINGS and not kwargs.get("url", False):
raise TypeError(
f"No client named '{biothing_type}', currently available clients are: {list(ASYNC_CLIENT_SETTINGS.keys())}"
)
_settings = (
ASYNC_CLIENT_SETTINGS[biothing_type]
if biothing_type in ASYNC_CLIENT_SETTINGS
else generate_async_settings(biothing_type, cast(Optional[str], kwargs.get("url")))
)
_class = type(
_settings["class_name"],
tuple([_settings["base_class"]] + _settings["mixins"]),
dict(_settings["class_kwargs"]),
)
for src_attr, target_attr in _settings["attr_aliases"].items():
if getattr(_class, src_attr, False):
setattr(
_class,
target_attr,
copy_func(getattr(_class, src_attr), name=target_attr),
)
for _name, _docstring in _settings["class_kwargs"].get("_docstring_obj", {}).items():
_func = getattr(_class, _name, None)
if _func:
try:
_func.__doc__ = _docstring
except AttributeError:
_func.__func__.__doc__ = _docstring
_client = cast(
Union[AsyncBiothingClient, Type[AsyncBiothingClient]],
_class(*args, **kwargs) if instance else _class,
)
return _client
# ***********************************************
# * Client settings
# *
# * This object contains the client-specific settings necessary to
# * instantiate a new biothings client. The currently supported
# * clients are the keys of this object.
# *
# * class - the client Class name
# * class_kwargs - keyword arguments passed to Class on creation
# * function_aliases - client specific function aliases in Class
# * ancestors - a list of classes that Class inherits from
# ***********************************************
ASYNC_CLIENT_SETTINGS: Dict[str, ClientSettings] = {
"gene": {
"class_name": "AsyncMyGeneInfo",
"class_kwargs": MYGENE_KWARGS,
"attr_aliases": MYGENE_ALIASES,
"base_class": AsyncBiothingClient,
"mixins": [MyGeneClientMixin],
},
"variant": {
"class_name": "AsyncMyVariantInfo",
"class_kwargs": MYVARIANT_KWARGS,
"attr_aliases": MYVARIANT_ALIASES,
"base_class": AsyncBiothingClient,
"mixins": [MyVariantClientMixin],
},
"taxon": {
"class_name": "AsyncMyTaxonInfo",
"class_kwargs": MYTAXON_KWARGS,
"attr_aliases": MYTAXON_ALIASES,
"base_class": AsyncBiothingClient,
"mixins": [],
},
"drug": {
"class_name": "AsyncMyChemInfo",
"class_kwargs": MYCHEM_KWARGS,
"attr_aliases": MYCHEM_ALIASES,
"base_class": AsyncBiothingClient,
"mixins": [],
},
"chem": {
"class_name": "AsyncMyChemInfo",
"class_kwargs": MYCHEM_KWARGS,
"attr_aliases": MYCHEM_ALIASES,
"base_class": AsyncBiothingClient,
"mixins": [],
},
"compound": {
"class_name": "AsyncMyChemInfo",
"class_kwargs": MYCHEM_KWARGS,
"attr_aliases": MYCHEM_ALIASES,
"base_class": AsyncBiothingClient,
"mixins": [],
},
"disease": {
"class_name": "AsyncMyDiseaseInfo",
"class_kwargs": MYDISEASE_KWARGS,
"attr_aliases": MYDISEASE_ALIASES,
"base_class": AsyncBiothingClient,
"mixins": [],
},
"geneset": {
"class_name": "AsyncMyGenesetInfo",
"class_kwargs": MYGENESET_KWARGS,
"attr_aliases": MYGENESET_ALIASES,
"base_class": AsyncBiothingClient,
"mixins": [],
},
}
def generate_async_settings(biothing_type: str, url: Optional[str]) -> ClientSettings:
"""
Tries to generate a settings dictionary for a client that isn't explicitly listed in
{CLIENT_SETTTINGS, ASYNC_CLIENT_SETTINGS}
:param biothing_type: The biothing type to target when generating settings
:param url: The web url specified in the settings via `default_url`
:return: Returns a dictionary mapping of the client settings
:rtype: dict
"""
def _pluralize(s: str, optional: bool = True) -> str:
_append = "({})" if optional else "{}"
return s + _append.format("es") if s.endswith("s") else s + _append.format("s")
_kwargs: ClientClassKwargs = copy(COMMON_KWARGS)
_aliases = copy(COMMON_ALIASES)
_kwargs.update(
{
"_default_url": url or "",
"_annotation_endpoint": "/" + biothing_type.lower() + "/",
"_optionally_plural_object_type": _pluralize(biothing_type.lower()),
"_default_cache_file": "my" + biothing_type.lower() + "_cache",
}
)
_aliases.update(
{
"_getannotation": "get" + biothing_type.lower(),
"_getannotations": "get" + _pluralize(biothing_type.lower(), optional=False),
}
)
return {
"class_name": "Async" + "My" + biothing_type.title() + "Info",
"class_kwargs": _kwargs,
"mixins": [],
"attr_aliases": _aliases,
"base_class": AsyncBiothingClient,
}