Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 34 additions & 9 deletions main/como/combine_distributions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
_OutputCombinedSourceFilepath,
_SourceWeights,
)
from como.utils import LogLevel, log_and_raise_error, _num_columns, get_missing_gene_data
from como.pipelines.identifier import convert
from como.utils import LogLevel, get_missing_gene_data, log_and_raise_error, num_columns


def _combine_z_distribution_for_batch(
Expand Down Expand Up @@ -55,9 +56,13 @@ def _combine_z_distribution_for_batch(
f"source: '{source.value}', "
f"context: '{context_name}'"
)
if _num_columns(matrix) < 2:
logger.trace(f"A single sample exists for batch '{batch.batch_num}'. Returning as-is because no additional combining can be done")
return matrix
if num_columns(matrix) < 2:
logger.trace(
f"A single sample exists for batch '{batch.batch_num}'. Returning as-is because no additional combining can be done"
)
with_batch_num = matrix.copy()
with_batch_num.columns = [batch.batch_num]
return with_batch_num

values = matrix.values
weighted_matrix = np.clip(
Expand Down Expand Up @@ -116,7 +121,7 @@ def _combine_z_distribution_for_source(
A pandas dataframe of the weighted z-distributions
"""
# If only one batch column exists, return as-is (rename like R path-through).
if _num_columns(merged_source_data) <= 1:
if num_columns(merged_source_data) <= 1:
logger.warning("A single batch exists for this source; returning matrix as-is.")
out_df = merged_source_data.copy()
out_df.columns = ["combine_z"]
Expand Down Expand Up @@ -203,8 +208,10 @@ def _combine_z_distribution_for_context(
z_matrices.append(matrix)

z_matrix = pd.concat(z_matrices, axis="columns", join="outer", ignore_index=False)
if _num_columns(z_matrix) <= 1:
logger.trace(f"Only 1 source exists for '{context}', returning dataframe as-is becuase no data exists to combine")
if num_columns(z_matrix) <= 1:
logger.trace(
f"Only 1 source exists for '{context}', returning dataframe as-is becuase no data exists to combine"
)
z_matrix.columns = ["combine_z"]
return z_matrix

Expand Down Expand Up @@ -275,8 +282,26 @@ async def _begin_combining_distributions(
batch_results: list[pd.DataFrame] = []
for batch in batch_names[source.value]:
batch: _BatchEntry
matrix_subset = cast(pd.DataFrame, matrix[[GeneIdentifier.ensembl_gene_id.value, *batch.sample_names]])
matrix_subset = matrix_subset.set_index(keys=[GeneIdentifier.ensembl_gene_id.value], drop=True)
if isinstance(matrix, pd.DataFrame):
matrix_subset = matrix[[GeneIdentifier.entrez_gene_id.value, *batch.sample_names]]
matrix_subset = matrix_subset.set_index(keys=[GeneIdentifier.entrez_gene_id.value], drop=True)
matrix_subset = matrix_subset.drop(columns=["gene_symbol", "ensembl_gene_id"], errors="ignore")
elif isinstance(matrix, sc.AnnData):
conversion = convert(ids=matrix.var_names.tolist(), taxon=taxon)
conversion.reset_index(drop=False, inplace=True)
matrix: pd.DataFrame = matrix.to_df().T
matrix.reset_index(inplace=True, drop=False, names=["gene_symbol"])
matrix: pd.DataFrame = matrix.merge(
conversion, left_on="gene_symbol", right_on="gene_symbol", how="left"
)
matrix_subset: pd.DataFrame = matrix[[GeneIdentifier.entrez_gene_id.value, *batch.sample_names]]
matrix_subset: pd.DataFrame = matrix_subset.set_index(
keys=[GeneIdentifier.entrez_gene_id.value], drop=True
)
else:
raise TypeError(
f"Unexpected matrix type for source '{source.value}'; expected 'pandas.DataFrame' or 'scanpy.AnnData': {type(matrix)}"
)

output_fp = output_filepaths[source.value].parent / f"{source.value}_batch{batch.batch_num}_combined_z_distrobution_{context_name}.csv"
batch_results.append(
Expand Down
23 changes: 10 additions & 13 deletions main/como/merge_xomics.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from __future__ import annotations

import asyncio
import re
import sys
from pathlib import Path
from typing import TextIO, cast
from typing import TextIO

import numpy as np
import pandas as pd
from fast_bioservices.biothings.mygene import MyGene
from loguru import logger

from como.combine_distributions import _begin_combining_distributions
Expand All @@ -24,7 +22,7 @@
_SourceWeights,
)
from como.project import Config
from como.utils import log_and_raise_error, read_file, set_up_logging, get_missing_gene_data, return_placeholder_data
from como.utils import get_missing_gene_data, log_and_raise_error, read_file, return_placeholder_data, set_up_logging


class _MergedHeaderNames:
Expand Down Expand Up @@ -348,17 +346,16 @@ async def _update_missing_data(input_matrices: _InputMatrices, taxon_id: int) ->
"proteomics": [input_matrices.proteomics],
}
logger.trace(f"Gathering missing data for data sources: {','.join(key for key in matrix_keys if key is not None)}")
# ruff: disable[E501]
# fmt: off
results = await asyncio.gather(
*[
# Using 'is not None' is required because the truth value of a Dataframe is ambiguous
get_missing_gene_data(values=input_matrices.trna, taxon_id=taxon_id) if input_matrices.trna is not None else asyncio.sleep(0),
get_missing_gene_data(values=input_matrices.mrna, taxon_id=taxon_id) if input_matrices.mrna is not None else asyncio.sleep(0),
get_missing_gene_data(values=input_matrices.scrna, taxon_id=taxon_id) if input_matrices.scrna is not None else asyncio.sleep(0),
get_missing_gene_data(values=input_matrices.proteomics, taxon_id=taxon_id) if input_matrices.proteomics is not None else asyncio.sleep(0),
]
results: tuple[pd.DataFrame | None, ...] = (
get_missing_gene_data(values=input_matrices.trna, taxon_id=taxon_id) if input_matrices.trna is not None else None,
get_missing_gene_data(values=input_matrices.mrna, taxon_id=taxon_id) if input_matrices.mrna is not None else None,
get_missing_gene_data(values=input_matrices.scrna, taxon_id=taxon_id) if input_matrices.scrna is not None else None,
get_missing_gene_data(values=input_matrices.proteomics, taxon_id=taxon_id) if input_matrices.proteomics is not None else None,
)
# fmt: on
# ruff: enable[E501]
for i, key in enumerate(matrix_keys):
matrix_keys[key].append(results[i])

Expand Down Expand Up @@ -591,7 +588,7 @@ async def merge_xomics( # noqa: C901
log_location: str | TextIO = sys.stderr,
):
"""Merge expression tables of multiple sources (RNA-seq, proteomics) into one."""
_set_up_logging(level=log_level, location=log_location)
set_up_logging(level=log_level, location=log_location)
logger.info(f"Starting to merge all omics data for context: '{context_name}'")

# fmt: off
Expand Down
100 changes: 100 additions & 0 deletions main/como/pipelines/identifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from collections.abc import Sequence
from typing import Literal

import pandas as pd
from bioservices.mygeneinfo import MyGeneInfo

__all__ = [
"convert",
"determine_gene_type",
]

T_MG_SCOPE = Literal["entrezgene", "ensembl.gene", "symbol"]
T_MG_TRANSLATE = Literal["entrez_gene_id", "ensembl_gene_id", "gene_symbol"]
T_MG_RETURN = list[dict[T_MG_TRANSLATE, str]]


def _get_conversion(info: MyGeneInfo, values: list[str], scope: T_MG_SCOPE, fields: str, taxon: str) -> T_MG_RETURN:
value_str = ",".join(map(str, values))
results = info.get_queries(query=value_str, dotfield=True, scopes=scope, fields=fields, species=taxon)
if not isinstance(results, list):
raise TypeError(f"Expected results to be a list, but got {type(results)}")
if not isinstance(results[0], dict):
raise TypeError(f"Expected each result to be a dict, but got {type(results[0])}")

data: T_MG_RETURN = []
for result in results:
ensembl = result.get("query" if scope == "ensembl.gene" else "ensembl.gene")
entrez = result.get("query" if scope == "entrezgene" else "entrezgene")
symbol = result.get("query" if scope == "symbol" else "symbol")
data.append({"ensembl_gene_id": ensembl, "entrez_gene_id": entrez, "gene_symbol": symbol})
return data


def convert(ids: int | str | Sequence[int] | Sequence[str] | Sequence[int | str], taxon: int | str, cache: bool = True):
"""Convert between genomic identifiers.

This function will convert between the following components:
- Entrez Gene ID
- Ensembl Gene ID
- Gene Symbol

:param ids: IDs to be converted
:param taxon: Taxonomic identifier
:param: scope: The type of identifier provided in `ids`
:param cache: Should local caching be used for queries
:return: DataFrame with columns "entrez_gene_id", "ensembl_gene_id", and "gene_symbol"
"""
my_geneinfo = MyGeneInfo(cache=cache)
chunk_size = 1000
id_list = list(map(str, [ids] if isinstance(ids, (int, str)) else ids))
chunks = list(range(0, len(id_list), chunk_size))

data_type = determine_gene_type(id_list)
if not all(v == data_type[id_list[0]] for v in data_type.values()):
raise ValueError("All items in ids must be of the same type (Entrez, Ensembl, or symbols).")

scope = next(iter(data_type.values()))
fields = ",".join({"ensembl.gene", "entrezgene", "symbol"} - {scope})
taxon_str = str(taxon)
return pd.DataFrame(
[
row
for i in chunks
for row in _get_conversion(
info=my_geneinfo,
values=id_list[i : i + chunk_size],
scope=scope,
fields=fields,
taxon=taxon_str,
)
]
)


def determine_gene_type(items: str | list[str], /) -> dict[str, T_MG_SCOPE]:
"""Determine the genomic data type.

:param items: A string or list of strings representing gene identifiers.
The function will determine whether each identifier is an Entrez Gene ID,
Ensembl Gene ID, or a gene symbol based on its format.

:return: A dictionary mapping each input item to its determined type, which can be one of:
- "entrez_gene_id": If the item consists solely of digits.
- "ensembl_gene_id": If the item starts with "ENS" and is
followed by a specific format (length greater than 11 and the last 11 characters are digits).
- "gene_symbol": If the item does not match the above criteria, it is assumed to be a gene symbol.
"""
items = [items] if isinstance(items, str) else items

determine: dict[str, Literal["entrezgene", "ensembl.gene", "symbol"]] = {}
for i in items:
i_str = str(i).split(".")[0] if isinstance(i, float) else str(i)
if i_str.isdigit():
determine[i_str] = "entrezgene"
elif i_str.startswith("ENS") and (len(i_str) > 11 and all(i.isdigit() for i in i_str[-11:])):
determine[i_str] = "ensembl.gene"
else:
determine[i_str] = "symbol"

return determine
30 changes: 19 additions & 11 deletions main/como/proteomics/Crux.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# ruff: noqa

from typing import Type
import asyncio
import multiprocessing
import os
Expand All @@ -8,10 +9,11 @@
from multiprocessing.sharedctypes import Synchronized
from pathlib import Path

from bioservices.biodbnet import BioDBNet
import numpy as np
import pandas as pd
import tqdm
from fast_bioservices import BioDBNet


from como.proteomics.FileInformation import FileInformation, clear_print

Expand Down Expand Up @@ -128,7 +130,9 @@ def mzml_to_sqt(self) -> None:
)

# Replace all "comet.*" in output directory with the name of the file being processed
comet_files = [str(file) for file in os.listdir(file_information.sqt_base_path) if str(file).startswith("comet.")]
comet_files = [
str(file) for file in os.listdir(file_information.sqt_base_path) if str(file).startswith("comet.")
]
for file_name in comet_files:
# Determine the old file path
old_file_path: Path = Path(file_information.sqt_base_path, file_name)
Expand Down Expand Up @@ -240,15 +244,19 @@ def collect_uniprot_ids_and_ion_intensity(self) -> None:

# Assign the file_information intensity dataframe to the gathered values
self._file_information[i].intensity_df = pd.DataFrame(average_intensities_dict)
self._file_information[i].intensity_df = self._file_information[i].intensity_df.groupby("uniprot", as_index=False).mean()
self._file_information[i].intensity_df = (
self._file_information[i].intensity_df.groupby("uniprot", as_index=False).mean()
)

async def _convert_uniprot_wrapper(self) -> None:
"""This function is a multiprocessing wrapper around the convert_ids function"""
values = [self.async_convert_uniprot(self._file_information[i]) for i in range(len(self._file_information))]

# Create a progress bar of results
# From: https://stackoverflow.com/a/61041328/
progress_bar = tqdm.tqdm(desc="Starting UniProt to Gene Symbol conversion... ", total=len(self._file_information))
progress_bar = tqdm.tqdm(
desc="Starting UniProt to Gene Symbol conversion... ", total=len(self._file_information)
)
for i, result in enumerate(asyncio.as_completed(values)):
await result # Get result from asyncio.as_completed
progress_bar.set_description(f"Working on {i + 1} of {len(self._file_information)}")
Expand All @@ -271,13 +279,11 @@ async def async_convert_uniprot(self, file_information: FileInformation) -> None
loop = asyncio.get_event_loop()
# async with self._semaphore:
# gene_symbols: pd.DataFrame = await loop.run_in_executor(None, self._biodbnet.db2db, "UniProt Accession", "Gene Symbol", input_values)
gene_symbols: pd.DataFrame = await loop.run_in_executor(
None,
self._biodbnet.db2db,
input_values,
"UniProt Accession",
"Gene Symbol",
gene_symbols = self._biodbnet.db2db(
input_db="UniProt Accession", output_db="Gene Symbol", input_values=input_values
)
if not isinstance(gene_symbols, pd.DataFrame):
raise TypeError(f"Expected gene_symbols to be a DataFrame, but got {type(gene_symbols)}")

# The index is UniProt IDs. Create a new column of these values
gene_symbols["uniprot"] = gene_symbols.index
Expand Down Expand Up @@ -372,7 +378,9 @@ def split_abundance_values(self) -> None:
# Create a new dataframe to split the S# columns from
split_frame: pd.DataFrame = dataframe.copy()
# Get the current S{i} columns in
abundance_columns: list[str] = [column for column in split_frame.columns if re.match(rf"{cell_type}_S{i}R\d+", column)]
abundance_columns: list[str] = [
column for column in split_frame.columns if re.match(rf"{cell_type}_S{i}R\d+", column)
]
take_columns: list[str] = ["symbol"] + abundance_columns
average_intensity_name: str = f"{cell_type}_S{i}"

Expand Down
19 changes: 11 additions & 8 deletions main/como/proteomics_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,18 @@ async def load_gene_symbol_map(gene_symbols: list[str], entrez_map: Path | None
biodbnet.services.settings.TIMEOUT = 60
for i in range(0, len(gene_symbols), step_size):
# Operations: Goes from gene_symbols=["A", "B;C", "D"] -> gene.split=[["A"], ["B", "C"], ["D"]] -> chain.from_iterable["A", "B", "C", "D"]
chunk: list[str] = list(itertools.chain.from_iterable(gene.split(";") for gene in gene_symbols[i : i + step_size]))
dataframes.append(
biodbnet.db2db(
input_values=chunk,
input_db=Input.GENE_SYMBOL.value,
output_db=[Output.GENE_ID.value, Output.ENSEMBL_GENE_ID.value],
taxon=9606,
)
chunk: list[str] = list(
itertools.chain.from_iterable(gene.split(";") for gene in gene_symbols[i : i + step_size])
)
result = biodbnet.db2db(
input_db="Gene Symbol",
output_db=["Gene ID", "Ensembl Gene ID"],
input_values=chunk,
taxon=9606,
)
if not isinstance(result, pd.DataFrame):
raise TypeError(f"Got {type(result)}, expected pd.DataFrame")
dataframes.append(result)
df = pd.concat(dataframes, axis="columns")
print(df)
df.loc[df["gene_id"].isna(), ["gene_id"]] = np.nan
Expand Down
9 changes: 3 additions & 6 deletions main/como/rnaseq_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from typing import NamedTuple, TextIO, cast

import anndata as ad
import boolean
import numpy as np
import numpy.typing as npt
import pandas as pd
Expand All @@ -19,13 +18,13 @@
import sklearn.neighbors
from anndata.compat import XDataArray
from anndata.experimental.backed import Dataset2D
from fast_bioservices.pipeline import ensembl_to_gene_id_and_symbol, gene_symbol_to_ensembl_and_gene_id
from loguru import logger
from scipy import sparse
from zfpkm import zFPKM, zfpkm_plot

from como.data_types import FilteringTechnique, LogLevel, RNAType
from como.migrations import gene_info_migrations
from como.pipelines.identifier import convert
from como.project import Config
from como.utils import log_and_raise_error, read_file, set_up_logging

Expand Down Expand Up @@ -173,17 +172,15 @@ async def _build_matrix_results(
raise TypeError("AnnData.var is expected to be a pandas.DataFrame")

matrix.var = matrix.var.reset_index(drop=False, names=["gene_symbol"])
conversion = await gene_symbol_to_ensembl_and_gene_id(symbols=matrix.var["gene_symbol"].tolist(), taxon=taxon)
conversion = convert(ids=matrix.var["gene_symbol"].tolist(), taxon=taxon)
else:
if "ensembl_gene_id" not in matrix.columns:
log_and_raise_error(
message="'ensembl_gene_id' column not found in the provided DataFrame.",
error=ValueError,
level=LogLevel.CRITICAL,
)
conversion: pd.DataFrame = await ensembl_to_gene_id_and_symbol(
ids=matrix["ensembl_gene_id"].tolist(), taxon=taxon
)
conversion: pd.DataFrame = convert(ids=matrix["ensembl_gene_id"].tolist(), taxon=taxon)
# If the entrez gene id column is empty, it is indicative that the incorrect taxon id was provided
if conversion["entrez_gene_id"].eq("-").all():
logger.critical(
Expand Down
Loading