import asyncio
import datetime
import functools
import math
import operator
import time
import typing as t
import warnings
import anndata
from cellarium.cas import _io, data_preparation, exceptions, service, settings
CHUNK_SIZE_ANNOTATE_DEFAULT = 1000
CHUNK_SIZE_SEARCH_DEFAULT = 500
[docs]
class CASClient:
"""
Service that is designed to communicate with the Cellarium Cloud Backend.
:param api_token: API token issued by the Cellarium team
:param num_attempts_per_chunk: Number of attempts the client should make to annotate each chunk. |br|
`Default:` ``3``
"""
def _print_models(self, models):
s = "Allowed model list in Cellarium CAS:\n"
for model in models:
model_name = model["model_name"]
model_schema = model["schema_name"]
embedding_dimension = model["embedding_dimension"]
if model["is_default_model"]:
model_name += " (default)"
s += f" - {model_name}\n Schema: {model_schema}\n Embedding dimension: {embedding_dimension}\n"
self._print(s)
def __init__(self, api_token: str, num_attempts_per_chunk: int = settings.NUM_ATTEMPTS_PER_CHUNK_DEFAULT) -> None:
self.cas_api_service = service.CASAPIService(api_token=api_token)
self._print("Connecting to the Cellarium Cloud backend...")
self.cas_api_service.validate_token()
# Retrieving General Info
application_info = self.cas_api_service.get_application_info()
self.model_objects_list = self.cas_api_service.get_model_list()
self.allowed_models_list = [x["model_name"] for x in self.model_objects_list]
self._model_name_obj_map = {x["model_name"]: x for x in self.model_objects_list}
self.default_model_name = [x["model_name"] for x in self.model_objects_list if x["is_default_model"]][0]
self.feature_schemas = self.cas_api_service.get_feature_schemas()
self._feature_schemas_cache = {}
self.num_attempts_per_chunk = num_attempts_per_chunk
self._print(f"Authenticated in Cellarium Cloud v. {application_info['application_version']}")
self._print_models(self.model_objects_list)
@staticmethod
def _get_number_of_chunks(adata, chunk_size):
return math.ceil(len(adata) / chunk_size)
@staticmethod
def _get_timestamp() -> str:
return datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
def _print(self, str_to_print: str) -> None:
print(f"* [{self._get_timestamp()}] {str_to_print}")
def __get_async_sharded_request_callback_task(
self,
results: t.List,
chunk_index: int,
chunk_start_i: int,
chunk_end_i: int,
semaphore: asyncio.Semaphore,
service_request_callback: t.Callable,
) -> t.Callable:
"""
A wrapper around POST request that handles HTTP and client errors, and resubmits the task if necessary.
In case of HTTP 500, 503, 504 or ClientError print a message and resubmit the task.
In case of HTTP 401 print a message to check the token.
In case of any other error print a message.
:param results: Results list that needs to be used to inplace the response from the server
:param chunk_index: Consequent number of the chunk (e.g. Chunk 1, Chunk 2)
:param chunk_start_i: Index pointing to the main adata file start position of the current chunk
:param chunk_end_i: Index pointing to the main adata file end position of the current chunk
:param semaphore: Semaphore object to limit the number of concurrent requests at a time
:param service_request_callback: Callback function to execute (should be one of the async methods of the
:class:`CASAPIService`)
:return: A callback function that can be used to submit a request to the backend
"""
async def sharded_request_task(**callback_kwargs):
async with semaphore:
retry_delay = settings.START_RETRY_DELAY
for _ in range(self.num_attempts_per_chunk):
try:
results[chunk_index] = await service_request_callback(**callback_kwargs)
except (exceptions.HTTPError5XX, exceptions.HTTPClientError) as e:
self._print(str(e))
self._print(
f"Resubmitting chunk #{chunk_index + 1:2.0f} ({chunk_start_i:5.0f}, "
f"{chunk_end_i:5.0f}) to CAS ..."
)
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, settings.MAX_RETRY_DELAY)
continue
except exceptions.HTTPError401:
self._print("Unauthorized token. Please check your API token or request a new one.")
break
except Exception as e:
self._print(f"Unexpected error: {e.__class__.__name__}; Message: {str(e)}")
break
else:
self._print(
f"Received the result for cell chunk #{chunk_index + 1:2.0f} ({chunk_start_i:5.0f}, "
f"{chunk_end_i:5.0f}) ..."
)
break
return sharded_request_task
def __async_sharded_request(
self,
adata: anndata.AnnData,
chunk_size: int,
request_callback: t.Callable,
request_callback_kwargs: t.Dict[str, t.Any],
) -> t.List[t.Dict[str, t.Any]]:
async def sharded_request():
i, j = 0, chunk_size
tasks = []
semaphore = asyncio.Semaphore(settings.MAX_NUM_REQUESTS_AT_A_TIME)
number_of_chunks = self._get_number_of_chunks(adata, chunk_size=chunk_size)
results = [[] for _ in range(number_of_chunks)]
for chunk_index in range(number_of_chunks):
chunk = adata[i:j, :]
chunk_start_i = i
chunk_end_i = i + len(chunk)
self._print(
f"Submitting cell chunk #{chunk_index + 1:2.0f} ({chunk_start_i:5.0f}, {chunk_end_i:5.0f}) "
f"to CAS ..."
)
chunk_bytes = _io.adata_to_bytes(adata=chunk)
async_sharded_request_task = self.__get_async_sharded_request_callback_task(
results=results,
chunk_index=chunk_index,
chunk_start_i=chunk_start_i,
chunk_end_i=chunk_end_i,
semaphore=semaphore,
service_request_callback=request_callback,
)
tasks.append(async_sharded_request_task(adata_bytes=chunk_bytes, **request_callback_kwargs))
i = j
j += chunk_size
await asyncio.wait(tasks)
return functools.reduce(operator.iconcat, results, [])
return asyncio.run(sharded_request())
def __postprocess_sharded_response(
self, query_response: t.List[t.Dict[str, t.Any]], adata: anndata.AnnData, query_item_list_key: str
) -> t.List[t.Dict[str, t.Any]]:
"""
Postprocess results by matching the order of cells in the response with the order of cells in the input
:param query_response: List of dictionaries with annotations for each of the cells from input adata
:param adata: :class:`anndata.AnnData` instance to annotate
:param query_item_list_key: Key in the dictionary that contains the list of items (e.g. annotation matches,
search result items)
:return: A list of dictionaries with annotations for each of the cells from input adata, ordered by the input
"""
processed_response = []
query_response_hash = {x["query_cell_id"]: x for x in query_response}
num_unannotated_cells = 0
for query_cell_id in adata.obs.index:
try:
query_item = query_response_hash[query_cell_id]
except KeyError:
query_item = {"query_cell_id": query_cell_id, query_item_list_key: []}
num_unannotated_cells += 1
processed_response.append(query_item)
if num_unannotated_cells > 0:
self._print(f"{num_unannotated_cells} cells were not processed by CAS")
return processed_response
def __postprocess_annotations(
self, query_response: t.List[t.Dict[str, t.Any]], adata: anndata.AnnData
) -> t.List[t.Dict[str, t.Any]]:
"""
Postprocess results by matching the order of cells in the response with the order of cells in the input
"""
return self.__postprocess_sharded_response(
query_response=query_response,
adata=adata,
query_item_list_key="matches",
)
def __postprocess_nearest_neighbor_search_response(
self, query_response: t.List[t.Dict[str, t.Any]], adata: anndata.AnnData
) -> t.List[t.Dict[str, t.Any]]:
"""
Postprocess nearest neighbor search response by matching the order of cells in the response with the order of
cells in the input
"""
return self.__postprocess_sharded_response(
query_response=query_response,
adata=adata,
query_item_list_key="neighbors",
)
@staticmethod
def __postprocess_query_cells_by_ids_response(
query_response: t.List[t.Dict[str, t.Any]]
) -> t.List[t.Dict[str, t.Any]]:
"""
Postprocess query cells by ids response by removing None values from cell metadata items in response
:param query_response: List of dictionaries with annotations for each of the cells from input adata
Returns:
"""
processed_response = []
for cell_metadata_item in query_response:
cell_metadata_item_processed = {}
for feature_name, value in cell_metadata_item.items():
if value is None:
continue
cell_metadata_item_processed[feature_name] = value
processed_response.append(cell_metadata_item_processed)
return processed_response
def __prepare_input_for_sharded_request(
self,
adata: anndata.AnnData,
cas_model_name: str = "default",
count_matrix_name: str = "X",
feature_ids_column_name: str = "index",
feature_names_column_name: t.Optional[str] = None,
) -> anndata.AnnData:
"""
Prepare input data for sharded request. Validates model name and input data, and sanitizes the input data
:param adata: :class:`anndata.AnnData` instance to annotate
:param cas_model_name: Model name to use for annotation. |br|
`Allowed Values:` Model name from the :attr:`allowed_models_list` list or ``"default"``
keyword, which refers to the default selected model in the Cellarium backend. |br|
`Default:` ``"default"``
:param count_matrix_name: Where to obtain a feature expression count matrix from. |br|
`Allowed Values:` Choice of either ``"X"`` or ``"raw.X"`` in order to use ``adata.X`` or ``adata.raw.X``
|br|
`Default:` ``"X"``
:param feature_ids_column_name: Column name where to obtain Ensembl feature ids. |br|
`Allowed Values:` A value from ``adata.var.columns`` or ``"index"`` keyword, which refers to index
column. |br|
`Default:` ``"index"``
:param feature_names_column_name: Column name where to obtain feature names (symbols).
feature names wouldn't be mapped if value is ``None`` |br|
`Allowed Values:` A value from ``adata.var.columns`` or ``"index"`` keyword, which refers to index
column. |br|
`Default:` ``None``
:return: A list of dictionaries with annotations for each of the cells from input adata
"""
if cas_model_name not in self.allowed_models_list and cas_model_name != "default":
raise ValueError(
f"Model name '{cas_model_name}' is not in the list of allowed models. "
f"Please use one of the following: {self.allowed_models_list} or 'default'"
)
cas_model_name = self.default_model_name if cas_model_name == "default" else cas_model_name
cas_model = self._model_name_obj_map[cas_model_name]
cas_model_name = cas_model["model_name"]
self._print(f"Cellarium CAS (Model ID: {cas_model_name})")
self._print(f"Total number of input cells: {len(adata)}")
return self.validate_and_sanitize_input_data(
adata=adata,
cas_model_name=cas_model_name,
count_matrix_name=count_matrix_name,
feature_ids_column_name=feature_ids_column_name,
feature_names_column_name=feature_names_column_name,
)
[docs]
def annotate_anndata(
self,
adata: "anndata.AnnData",
chunk_size=CHUNK_SIZE_ANNOTATE_DEFAULT,
cas_model_name: str = "default",
count_matrix_name: str = "X",
feature_ids_column_name: str = "index",
feature_names_column_name: t.Optional[str] = None,
include_dev_metadata: bool = False,
) -> t.List[t.Dict[str, t.Any]]:
"""
Send an instance of :class:`anndata.AnnData` to the Cellarium Cloud backend for annotations. The function
splits the ``adata`` into smaller chunks and asynchronously sends them to the backend API service. Each chunk is
of equal size, except for the last one, which may be smaller. The backend processes these chunks in parallel.
:param adata: :class:`anndata.AnnData` instance to annotate
:param chunk_size: Size of chunks to split on
:param cas_model_name: Model name to use for annotation. |br|
`Allowed Values:` Model name from the :attr:`allowed_models_list` list or ``"default"``
keyword, which refers to the default selected model in the Cellarium backend. |br|
`Default:` ``"default"``
:param count_matrix_name: Where to obtain a feature expression count matrix from. |br|
`Allowed Values:` Choice of either ``"X"`` or ``"raw.X"`` in order to use ``adata.X`` or ``adata.raw.X``
|br|
`Default:` ``"X"``
:param feature_ids_column_name: Column name where to obtain Ensembl feature ids. |br|
`Allowed Values:` A value from ``adata.var.columns`` or ``"index"`` keyword, which refers to index
column. |br|
`Default:` ``"index"``
:param feature_names_column_name: Column name where to obtain feature names (symbols).
feature names wouldn't be mapped if value is ``None`` |br|
`Allowed Values:` A value from ``adata.var.columns`` or ``"index"`` keyword, which refers to index
column. |br|
`Default:` ``None``
:param include_dev_metadata: Boolean indicating whether to include a breakdown of the number of cells
by dataset
:return: A list of dictionaries with annotations for each of the cells from input adata
"""
cas_model_name = self.default_model_name if cas_model_name == "default" else cas_model_name
start = time.time()
adata = self.__prepare_input_for_sharded_request(
adata=adata,
cas_model_name=cas_model_name,
count_matrix_name=count_matrix_name,
feature_ids_column_name=feature_ids_column_name,
feature_names_column_name=feature_names_column_name,
)
results = self.__async_sharded_request(
adata=adata,
chunk_size=chunk_size,
request_callback=self.cas_api_service.async_annotate_anndata_chunk,
request_callback_kwargs={
"model_name": cas_model_name,
"include_dev_metadata": include_dev_metadata,
},
)
result = self.__postprocess_annotations(results, adata)
self._print(f"Total wall clock time: {f'{time.time() - start:10.4f}'} seconds")
self._print("Finished!")
return result
[docs]
def annotate_anndata_file(
self,
filepath: str,
chunk_size=CHUNK_SIZE_ANNOTATE_DEFAULT,
cas_model_name: str = "default",
count_matrix_name: str = "X",
feature_ids_column_name: str = "index",
feature_names_column_name: t.Optional[str] = None,
include_dev_metadata: bool = False,
) -> t.List[t.Dict[str, t.Any]]:
"""
Read the 'h5ad' file into a :class:`anndata.AnnData` matrix and apply the :meth:`annotate_anndata` method to it.
:param filepath: Filepath of the local :class:`anndata.AnnData` matrix
:param chunk_size: Size of chunks to split on
:param cas_model_name: Model name to use for annotation. |br|
`Allowed Values:` Model name from the :attr:`allowed_models_list` list or ``"default"``
keyword, which refers to the default selected model in the Cellarium backend. |br|
`Default:` ``"default"``
:param count_matrix_name: Where to obtain a feature expression count matrix from. |br|
`Allowed Values:` Choice of either ``"X"`` or ``"raw.X"`` in order to use ``adata.X`` or ``adata.raw.X``
|br|
`Default:` ``"X"``
:param feature_ids_column_name: Column name where to obtain Ensembl feature ids. |br|
`Allowed Values:` A value from ``adata.var.columns`` or ``"index"`` keyword, which refers to index
column. |br|
`Default:` ``"index"``
:param feature_names_column_name: Column name where to obtain feature names (symbols).
feature names wouldn't be mapped if value is ``None`` |br|
`Allowed Values:` A value from ``adata.var.columns`` or ``"index"`` keyword, which refers to index
column. |br|
`Default:` ``None``
:param include_dev_metadata: Boolean indicating whether to include a breakdown of the number of cells
per dataset
:return: A list of dictionaries with annotations for each of the cells from input adata
"""
with warnings.catch_warnings():
warnings.simplefilter("ignore")
adata = anndata.read_h5ad(filename=filepath)
return self.annotate_anndata(
adata=adata,
chunk_size=chunk_size,
cas_model_name=cas_model_name,
count_matrix_name=count_matrix_name,
feature_ids_column_name=feature_ids_column_name,
feature_names_column_name=feature_names_column_name,
include_dev_metadata=include_dev_metadata,
)
[docs]
def annotate_10x_h5_file(
self,
filepath: str,
chunk_size: int = CHUNK_SIZE_ANNOTATE_DEFAULT,
cas_model_name: str = "default",
count_matrix_name: str = "X",
feature_ids_column_name: str = "index",
feature_names_column_name: t.Optional[str] = None,
include_dev_metadata: bool = False,
) -> t.List[t.Dict[str, t.Any]]:
"""
Parse the 10x 'h5' matrix and apply the :meth:`annotate_anndata` method to it.
:param filepath: Filepath of the local 'h5' matrix
:param chunk_size: Size of chunks to split on
:param cas_model_name: Model name to use for annotation. |br|
`Allowed Values:` Model name from the :attr:`allowed_models_list` list or ``"default"``
keyword, which refers to the default selected model in the Cellarium backend. |br|
`Default:` ``"default"``
:param count_matrix_name: Where to obtain a feature expression count matrix from. |br|
`Allowed Values:` Choice of either ``"X"`` or ``"raw.X"`` in order to use ``adata.X`` or ``adata.raw.X``
|br|
`Default:` ``"X"``
:param feature_ids_column_name: Column name where to obtain Ensembl feature ids. |br|
`Allowed Values:` A value from ``adata.var.columns`` or ``"index"`` keyword, which refers to index
column. |br|
`Default:` ``"index"``
:param feature_names_column_name: Column name where to obtain feature names (symbols).
feature names wouldn't be mapped if value is ``None`` |br|
`Allowed Values:` A value from ``adata.var.columns`` or ``"index"`` keyword, which refers to index
column. |br|
`Default:` ``None``
:param include_dev_metadata: Boolean indicating whether to include a breakdown of the number of cells by dataset
:return: A list of dictionaries with annotations for each of the cells from input adata
"""
adata = _io.read_10x_h5(filepath)
return self.annotate_anndata(
adata=adata,
chunk_size=chunk_size,
cas_model_name=cas_model_name,
count_matrix_name=count_matrix_name,
feature_ids_column_name=feature_ids_column_name,
feature_names_column_name=feature_names_column_name,
include_dev_metadata=include_dev_metadata,
)
[docs]
def search_anndata(
self,
adata: anndata.AnnData,
chunk_size=CHUNK_SIZE_SEARCH_DEFAULT,
cas_model_name: str = "default",
count_matrix_name: str = "X",
feature_ids_column_name: str = "index",
feature_names_column_name: t.Optional[str] = None,
) -> t.List[t.Dict[str, t.Any]]:
"""
Send an instance of :class:`anndata.AnnData` to the Cellarium Cloud backend for nearest neighbor search. The
function splits the ``adata`` into smaller chunks and asynchronously sends them to the backend API service.
Each chunk is of equal size, except for the last one, which may be smaller. The backend processes
these chunks in parallel.
:param adata: :class:`anndata.AnnData` instance to annotate
:param chunk_size: Size of chunks to split on
:param cas_model_name: Model name to use for annotation. |br|
`Allowed Values:` Model name from the :attr:`allowed_models_list` list or ``"default"``
keyword, which refers to the default selected model in the Cellarium backend. |br|
`Default:` ``"default"``
:param count_matrix_name: Where to obtain a feature expression count matrix from. |br|
`Allowed Values:` Choice of either ``"X"`` or ``"raw.X"`` in order to use ``adata.X`` or ``adata.raw.X``
|br|
`Default:` ``"X"``
:param feature_ids_column_name: Column name where to obtain Ensembl feature ids. |br|
`Allowed Values:` A value from ``adata.var.columns`` or ``"index"`` keyword, which refers to index
column. |br|
`Default:` ``"index"``
:param feature_names_column_name: Column name where to obtain feature names (symbols).
feature names wouldn't be mapped if value is ``None`` |br|
`Allowed Values:` A value from ``adata.var.columns`` or ``"index"`` keyword, which refers to index
column. |br|
`Default:` ``None``
:return: A list of dictionaries with annotations for each of the cells from input adata
"""
if chunk_size > 500:
raise ValueError("Chunk size greater than 500 not supported yet.")
cas_model_name = self.default_model_name if cas_model_name == "default" else cas_model_name
start = time.time()
adata = self.__prepare_input_for_sharded_request(
adata=adata,
cas_model_name=cas_model_name,
count_matrix_name=count_matrix_name,
feature_ids_column_name=feature_ids_column_name,
feature_names_column_name=feature_names_column_name,
)
results = self.__async_sharded_request(
adata=adata,
chunk_size=chunk_size,
request_callback=self.cas_api_service.async_nearest_neighbor_search,
request_callback_kwargs={"model_name": cas_model_name},
)
result = self.__postprocess_nearest_neighbor_search_response(results, adata)
self._print(f"Total wall clock time: {f'{time.time() - start:10.4f}'} seconds")
self._print("Finished!")
return result
[docs]
def search_10x_h5_file(
self,
filepath: str,
chunk_size: int = CHUNK_SIZE_SEARCH_DEFAULT,
cas_model_name: str = "default",
count_matrix_name: str = "X",
feature_ids_column_name: str = "index",
feature_names_column_name: t.Optional[str] = None,
) -> t.List[t.Dict[str, t.Any]]:
"""
Parse the 10x 'h5' matrix and apply the :meth:`search_anndata` method to it.
:param filepath: Filepath of the local 'h5' matrix
:param chunk_size: Size of chunks to split on
:param cas_model_name: Model name to use for annotation. |br|
`Allowed Values:` Model name from the :attr:`allowed_models_list` list or ``"default"``
keyword, which refers to the default selected model in the Cellarium backend. |br|
`Default:` ``"default"``
:param count_matrix_name: Where to obtain a feature expression count matrix from. |br|
`Allowed Values:` Choice of either ``"X"`` or ``"raw.X"`` in order to use ``adata.X`` or ``adata.raw.X``
|br|
`Default:` ``"X"``
:param feature_ids_column_name: Column name where to obtain Ensembl feature ids. |br|
`Allowed Values:` A value from ``adata.var.columns`` or ``"index"`` keyword, which refers to index
column. |br|
`Default:` ``"index"``
:param feature_names_column_name: Column name where to obtain feature names (symbols).
feature names wouldn't be mapped if value is ``None`` |br|
`Allowed Values:` A value from ``adata.var.columns`` or ``"index"`` keyword, which refers to index
column. |br|
`Default:` ``None``
:return: A list of dictionaries with annotations for each of the cells from input adata
"""
adata = _io.read_10x_h5(filepath)
return self.search_anndata(
adata=adata,
chunk_size=chunk_size,
cas_model_name=cas_model_name,
count_matrix_name=count_matrix_name,
feature_ids_column_name=feature_ids_column_name,
feature_names_column_name=feature_names_column_name,
)
[docs]
def query_cells_by_ids(
self, cell_ids: t.List[int], model_name: str, metadata_feature_names: t.List[str] = None
) -> t.List[t.Dict[str, t.Any]]:
"""
Query cells by their ids from a single anndata file with Cellarium CAS. Input file should be validated and sanitized
according to the model schema.
:param cell_ids: List of cell ids to query
:param model_name: Model name to use for annotation. |br|
`Allowed Values:` Model name from the :attr:`allowed_models_list` list or ``"default"``
keyword, which refers to the default selected model in the Cellarium backend. |br|
`Default:` ``"default"``
:param metadata_feature_names: List of metadata feature names to include in the response. |br|
:return: List of cells with metadata
"""
results = self.cas_api_service.query_cells_by_ids(
cell_ids=cell_ids,
model_name=model_name,
metadata_feature_names=metadata_feature_names,
)
return self.__postprocess_query_cells_by_ids_response(query_response=results)
[docs]
def validate_model_name(self, model_name: str) -> None:
"""
Validate if the model name provided is valid
:param model_name: Model name to check
:raises: ValueError if model name is not valid
"""
if model_name not in self.allowed_models_list and model_name != "default":
raise ValueError(
f"Model name '{model_name}' is not in the list of allowed models. "
f"Please use one of the following: {self.allowed_models_list} or 'default'"
)