Source code for pysradb.sraweb
"""Utilities to interact with SRA online"""
import concurrent.futures
import os
import sys
import time
import warnings
from collections import OrderedDict
from json.decoder import JSONDecodeError
from xml.parsers.expat import ExpatError
import numpy as np
import pandas as pd
import requests
import xmltodict
from .sradb import SRAdb
warnings.simplefilter(action="ignore", category=FutureWarning)
from xml.sax.saxutils import escape
def _order_first(df, column_order_list):
columns = column_order_list + [
col for col in df.columns.tolist() if col not in column_order_list
]
# check if all columns do exist in the dataframe
if len(set(columns).intersection(df.columns)) == len(columns):
df = df.loc[:, columns]
df = df.mask(df.applymap(str).eq("[]"))
df = df.fillna(pd.NA)
return df
def _retry_response(base_url, payload, key, max_retries=10):
"""Rerty fetching esummary if API rate limit exceeeds"""
for index, _ in enumerate(range(max_retries)):
try:
request = requests.get(base_url, params=OrderedDict(payload))
response = request.json()
results = response[key]
return response
except KeyError:
# sleep for increasing times
time.sleep(index + 1)
continue
raise RuntimeError("Failed to fetch esummary. API rate limit exceeded.")
[docs]def get_retmax(n_records, retmax=500):
"""Get retstart and retmax till n_records are exhausted"""
for i in range(0, n_records, retmax):
yield i
[docs]class SRAweb(SRAdb):
def __init__(self, api_key=None):
"""
Initialize a SRAwebdb.
Parameters
----------
api_key: string
API key for ncbi eutils.
"""
self.base_url = dict()
self.base_url[
"esummary"
] = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
self.base_url[
"esearch"
] = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi"
self.base_url[
"efetch"
] = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
self.ena_fastq_search_url = "https://www.ebi.ac.uk/ena/portal/api/filereport"
self.ena_params = [("result", "read_run"), ("fields", "fastq_ftp")]
self.esearch_params = {}
self.esearch_params["sra"] = [
("db", "sra"),
("usehistory", "n"),
("retmode", "json"),
]
self.esearch_params["geo"] = [
("db", "gds"),
("usehistory", "n"),
("retmode", "json"),
]
self.efetch_params = [
("db", "sra"),
("usehistory", "n"),
("retmode", "runinfo"),
]
if api_key is not None:
self.esearch_params["sra"].append(("api_key", str(api_key)))
self.esearch_params["geo"].append(("api_key", str(api_key)))
self.efetch_params.append(("api_key", str(api_key)))
self.sleep_time = 1 / 10
else:
self.sleep_time = 1 / 3
[docs] @staticmethod
def format_xml(string):
"""Create a fake root to make 'string' a valid xml
Parameters
----------
string: str
Returns
--------
xml: str
"""
# string = unescape(string.strip())
string = string.strip()
return "<root>" + string + "</root>"
[docs] @staticmethod
def xml_to_json(xml):
"""Convert xml to json.
Parameters
----------
xml: str
Input XML
Returns
-------
xml_dict: dict
Parsed xml as dict
"""
try:
xmldict = xmltodict.parse(
xml, process_namespaces=True, dict_constructor=OrderedDict
)
json = xmldict["root"]
except ExpatError:
raise RuntimeError("Unable to parse xml: {}".format(xml))
return json
[docs] def fetch_ena_fastq(self, srp):
"""Fetch FASTQ records from ENA (EXPERIMENTAL)
Parameters
----------
srp: string
Srudy accession
Returns
-------
srr_url: list
List of SRR fastq urls
"""
payload = self.ena_params.copy()
payload += [("accession", srp)]
request = requests.get(self.ena_fastq_search_url, params=OrderedDict(payload))
request_text = request.text.strip()
urls = []
for line in request_text.split("\n"):
if "fastq_ftp" in line:
continue
line = line.strip()
line_split = line.split("\t")
if len(line_split) != 2:
continue
srr, url = line.split("\t")
http_url = "http://{}".format(url)
ftp_url = url.replace("ftp.sra.ebi.ac.uk/", "era-fasp@fasp.sra.ebi.ac.uk:")
urls += [(srr, http_url, ftp_url)]
# Paired end case
def _handle_url_split(url_split):
url1_1 = pd.NA
url1_2 = pd.NA
for url_temp in url_split:
if "_1.fastq.gz" in url_temp:
url1_1 = url_temp
elif "_2.fastq.gz" in url_temp:
url1_2 = url_temp
return url1_1, url1_2
if ";" in request_text:
urls_expanded = []
for srr, url1, url2 in urls:
# strip _1, _2
srr = srr.split("_")[0]
if ";" in url1:
url1_split = url1.split(";")
if len(url1_split) == 2:
url1_1, url1_2 = url1_split
else:
# warnings.warn('ignoring extra urls found for paired end accession')
url1_1, url1_2 = _handle_url_split(url1_split)
url1_2 = "http://{}".format(url1_2)
url2_split = url2.split(";")
if len(url2_split) == 2:
url2_1, url2_2 = url2_split
else:
# warnings.warn('ignoring extra urls found for paired end accession')
url2_1, url2_2 = _handle_url_split(url2_split)
else:
url1_1 = url1
url2_1 = url2
url1_2 = ""
url2_2 = ""
urls_expanded.append((srr, url1_1, url1_2, url2_1, url2_2))
return pd.DataFrame(
urls_expanded,
columns=[
"run_accession",
"ena_fastq_http_1",
"ena_fastq_http_2",
"ena_fastq_ftp_1",
"ena_fastq_ftp_2",
],
).sort_values(by="run_accession")
else:
return pd.DataFrame(
urls, columns=["run_accession", "ena_fastq_http", "ena_fastq_ftp"]
).sort_values(by="run_accession")
[docs] def create_esummary_params(self, esearchresult, db="sra"):
query_key = esearchresult["querykey"]
webenv = esearchresult["webenv"]
retstart = esearchresult["retstart"]
# TODO this should be adaptive to build
# upon using the 'count' result in esearch result,
# Currently only supports a max of 500 records.
# retmax = esearchresult["retmax"]
retmax = 500
return [
("query_key", query_key),
("WebEnv", webenv),
("retstart", retstart),
("retmax", retmax),
]
[docs] def get_esummary_response(self, db, term, usehistory="y"):
assert db in ["sra", "geo"]
payload = self.esearch_params[db].copy()
if isinstance(term, list):
term = " OR ".join(term)
payload += [("term", term)]
request = requests.post(self.base_url["esearch"], data=OrderedDict(payload))
try:
esearch_response = request.json()
except JSONDecodeError:
sys.stderr.write(
"Unable to parse esummary response json: {}{}. Will retry once.".format(
request.text, os.linesep
)
)
retry_after = request.headers.get("Retry-After", 1)
time.sleep(int(retry_after))
request = requests.post(self.base_url["esearch"], data=OrderedDict(payload))
try:
esearch_response = request.json()
except JSONDecodeError:
sys.stderr.write(
"Unable to parse esummary response json: {}{}. Aborting.".format(
request.text, os.linesep
)
)
sys.exit(1)
# retry again
if "esummaryresult" in esearch_response:
print("No result found")
return
if "error" in esearch_response:
# API rate limite exceeded
esearch_response = _retry_response(
self.base_url["esearch"], payload, "esearchresult"
)
n_records = int(esearch_response["esearchresult"]["count"])
results = {}
for retstart in get_retmax(n_records):
payload = self.esearch_params[db].copy()
payload += self.create_esummary_params(esearch_response["esearchresult"])
payload = OrderedDict(payload)
payload["retstart"] = retstart
request = requests.get(
self.base_url["esummary"], params=OrderedDict(payload)
)
try:
response = request.json()
except JSONDecodeError:
time.sleep(1)
response = _retry_response(self.base_url["esummary"], payload, "result")
if "error" in response:
# API rate limite exceeded
response = _retry_response(self.base_url["esummary"], payload, "result")
if retstart == 0:
results = response["result"]
else:
result = response["result"]
for key, value in result.items():
if key in list(results.keys()):
results[key] += value
else:
results[key] = value
return results
[docs] def get_efetch_response(self, db, term, usehistory="y"):
assert db in ["sra", "geo"]
payload = self.esearch_params[db].copy()
if isinstance(term, list):
term = " OR ".join(term)
payload += [("term", term)]
request = requests.get(self.base_url["esearch"], params=OrderedDict(payload))
esearch_response = request.json()
if "esummaryresult" in esearch_response:
print("No result found")
return
if "error" in esearch_response:
# API rate limite exceeded
esearch_response = _retry_response(
self.base_url["esearch"], payload, "esearchresult"
)
n_records = int(esearch_response["esearchresult"]["count"])
results = {}
for retstart in get_retmax(n_records):
payload = self.efetch_params.copy()
payload += self.create_esummary_params(esearch_response["esearchresult"])
payload = OrderedDict(payload)
payload["retstart"] = retstart
request = requests.get(self.base_url["efetch"], params=OrderedDict(payload))
request_text = request.text.strip()
try:
request_json = request.json()
except:
request_json = {} # eval(request_text)
if "error" in request_json:
# print("Encountered: {}".format(request_json))
# print("Headers: {}".format(request.headers))
# Handle API-rate limit exceeding
try:
retry_after = request.headers["Retry-After"]
except KeyError:
if request_json["error"] == "error forwarding request":
sys.stderr.write("Encountered error while making request.\n")
sys.exit(1)
time.sleep(int(retry_after))
# try again
request = requests.get(
self.base_url["efetch"], params=OrderedDict(payload)
)
request_text = request.text.strip()
try:
request_json = request.json()
if request_json["error"] == "error forwarding request":
sys.stderr.write("Encountered error while making request.\n")
return
except:
request_json = {} # eval(request_text)
try:
xml_response = xmltodict.parse(
request_text, dict_constructor=OrderedDict
)
exp_response = xml_response.get("EXPERIMENT_PACKAGE_SET", {})
response = exp_response.get("EXPERIMENT_PACKAGE", {})
except ExpatError:
sys.stderr.write(
"Unable to parse xml: {}{}".format(request_text, os.linesep)
)
sys.exit(1)
if not response:
sys.stderr.write(
"Unable to parse xml response. Received: {}{}".format(
xml_response, os.linesep
)
)
sys.exit(1)
if retstart == 0:
results = response
else:
result = response
for value in result:
results.append(value)
time.sleep(self.sleep_time)
return results
[docs] def sra_metadata(
self,
srp,
sample_attribute=False,
detailed=False,
expand_sample_attributes=False,
output_read_lengths=False,
**kwargs
):
esummary_result = self.get_esummary_response("sra", srp)
try:
uids = esummary_result["uids"]
except KeyError:
print("No results found for {}".format(srp))
return None
exps_xml = OrderedDict()
runs_xml = OrderedDict()
exps_json = OrderedDict()
runs_json = OrderedDict()
for uid in uids:
exps_xml[uid] = self.format_xml(esummary_result[uid]["expxml"])
runs_xml[uid] = self.format_xml(esummary_result[uid]["runs"])
for uid in uids:
exps_json[uid] = self.xml_to_json(exps_xml[uid])
runs_json[uid] = self.xml_to_json(runs_xml[uid])
sra_record = []
for uid, run_json in runs_json.items():
exp_json = exps_json[uid]
exp_summary = exp_json["Summary"]
exp_title = exp_summary.get("Title", pd.NA)
exp_platform = exp_summary.get("Platform", {})
statistics = exp_summary.get("Statistics", {})
if isinstance(exp_platform, OrderedDict):
exp_platform_model = exp_platform.get("@instrument_model", pd.NA)
exp_platform_desc = exp_platform.get("#text", pd.NA)
else:
exp_platform_model = pd.NA
exp_platform_desc = pd.NA
exp_total_runs = statistics.get("@total_runs", pd.NA)
exp_total_spots = statistics.get("@total_spots", pd.NA)
exp_total_size = statistics.get("@total_size", pd.NA)
# experiment_accession
exp_ID = exp_json["Experiment"]["@acc"]
# experiment_title
exp_name = exp_json["Experiment"]["@name"]
exp_organism = exp_json.get("Organism", pd.NA)
exp_organism_name = pd.NA
exp_taxid = pd.NA
if isinstance(exp_organism, dict):
exp_organism_name = exp_organism.get("@ScientificName", pd.NA)
exp_taxid = exp_organism["@taxid"]
exp_instrument = list(exp_json["Instrument"].values())[0]
exp_sample = exp_json["Sample"]
# sample_accession
exp_sample_ID = exp_sample["@acc"]
# sample_title
exp_sample_name = exp_sample["@name"]
exp_library_descriptor = exp_json["Library_descriptor"]
# library_strategy
exp_library_strategy = exp_library_descriptor["LIBRARY_STRATEGY"]
if isinstance(exp_library_strategy, dict):
exp_library_strategy = exp_library_strategy["#text"]
# library_source
exp_library_source = exp_library_descriptor["LIBRARY_SOURCE"]
if isinstance(exp_library_source, dict):
exp_library_source = exp_library_source["#text"]
# library_selection
exp_library_selection = exp_library_descriptor["LIBRARY_SELECTION"]
if isinstance(exp_library_selection, dict):
exp_library_selection = exp_library_selection["#text"]
# library_name
exp_library_name = exp_library_descriptor.get("LIBRARY_NAME", "")
if isinstance(exp_library_selection, dict):
exp_library_name = exp_library_name["#text"]
# library_layout
exp_library_layout = list(exp_library_descriptor["LIBRARY_LAYOUT"].keys())[
0
]
experiment_record = OrderedDict()
experiment_record["study_accession"] = exp_json["Study"]["@acc"]
experiment_record["study_title"] = exp_json["Study"]["@name"]
experiment_record["experiment_accession"] = exp_ID
experiment_record["experiment_title"] = exp_name
experiment_record["experiment_desc"] = exp_title
experiment_record["organism_taxid"] = exp_taxid
experiment_record["organism_name"] = exp_organism_name
experiment_record["library_name"] = exp_library_name
experiment_record["library_strategy"] = exp_library_strategy
experiment_record["library_source"] = exp_library_source
experiment_record["library_selection"] = exp_library_selection
experiment_record["library_layout"] = exp_library_layout
experiment_record["sample_accession"] = exp_sample_ID
experiment_record["sample_title"] = exp_sample_name
experiment_record["instrument"] = exp_instrument
experiment_record["instrument_model"] = exp_platform_model
experiment_record["instrument_model_desc"] = exp_platform_desc
experiment_record["total_spots"] = exp_total_spots
experiment_record["total_size"] = exp_total_size
if not run_json:
# Sometimes the run_accession is not populated by NCBI:
# df2 = self.srx_to_srr(exp_ID)
# extra_fields = set(experiment_record.keys()).difference(df2.columns.tolist())
# for idx, row in df2.iterrows():
# for field in extra_fields:
# experiment_record[field] = row[field]
sra_record.append(experiment_record)
continue
runs = run_json["Run"]
if not isinstance(runs, list):
runs = [runs]
for run_record in runs:
run_accession = run_record["@acc"]
run_total_spots = run_record["@total_spots"]
run_total_bases = run_record["@total_bases"]
experiment_record["run_accession"] = run_accession
experiment_record["run_total_spots"] = run_total_spots
experiment_record["run_total_bases"] = run_total_bases
sra_record.append(experiment_record.copy())
# TODO: the detailed call below does redundant operations
# the code above this can be completeley done away with
metadata_df = pd.DataFrame(sra_record).drop_duplicates()
if "run_accession" in metadata_df.columns:
metadata_df = metadata_df.sort_values(by="run_accession")
metadata_df.columns = [x.lower().strip() for x in metadata_df.columns]
if not detailed:
return metadata_df
time.sleep(self.sleep_time)
efetch_result = self.get_efetch_response("sra", srp)
if not isinstance(efetch_result, list):
if efetch_result:
efetch_result = [efetch_result]
else:
return None
detailed_records = []
for record in efetch_result:
if "SAMPLE" in record.keys() and "SAMPLE_ATTRIBUTES" in record["SAMPLE"]:
sample_attributes = record["SAMPLE"]["SAMPLE_ATTRIBUTES"][
"SAMPLE_ATTRIBUTE"
]
else:
sample_attributes = []
if isinstance(sample_attributes, OrderedDict):
sample_attributes = [sample_attributes]
exp_record = record["EXPERIMENT"]
exp_attributes = exp_record.get("EXPERIMENT_ATTRIBUTES", {})
run_sets = record["RUN_SET"].get("RUN", [])
if not isinstance(run_sets, list):
run_sets = [run_sets]
for run_set in run_sets:
detailed_record = OrderedDict()
if not run_json:
# Add experiment accession if no run info found earlier
detailed_record["experiment_accession"] = exp_record["@accession"]
# detailed_record["experiment_title"] = exp_record["TITLE"]
for key, values in exp_attributes.items():
key = key.lower()
for value_x in values:
if not isinstance(value_x, dict):
continue
tag = value_x["TAG"].lower()
value = value_x["VALUE"]
detailed_record[tag] = value
lib_record = exp_record["DESIGN"]["LIBRARY_DESCRIPTOR"]
for key, value in lib_record.items():
key = key.lower()
if key == "library_layout":
value = list(value.keys())[0]
elif key == "library_construction_protocol":
continue
# detailed_record[key] = value
detailed_record["run_accession"] = run_set["@accession"]
detailed_record["run_alias"] = run_set["@alias"]
sra_files = run_set.get("SRAFiles", {})
sra_files = sra_files.get("SRAFile", {})
if isinstance(sra_files, OrderedDict):
# detailed_record["sra_url"] = sra_files.get("@url", pd.NA)
if "Alternatives" in sra_files.keys():
alternatives = sra_files["Alternatives"]
if not isinstance(alternatives, list):
alternatives = [alternatives]
for alternative in alternatives:
org = alternative["@org"].lower()
for key in alternative.keys():
if key == "@org":
continue
detailed_record[
"{}_{}".format(org, key.replace("@", ""))
] = alternative[key]
else:
for sra_file in sra_files:
# Multiple download URLs
# Use the one where the download filename corresponds to the SRR
cluster = sra_file.get("@cluster", None).lower().strip()
if cluster is None:
continue
for key in sra_file.keys():
if key == "@cluster":
continue
if key == "Alternatives":
# Example: SRP184142
alternatives = sra_file["Alternatives"]
if not isinstance(alternatives, list):
alternatives = [alternatives]
for alternative in alternatives:
org = alternative["@org"].lower()
for key in alternative.keys():
if key == "@org":
continue
detailed_record[
"{}_{}".format(org, key.replace("@", ""))
] = alternative[key]
else:
detailed_record[
"{}_{}".format(cluster, key.replace("@", ""))
] = sra_file[key]
expt_ref = run_set["EXPERIMENT_REF"]
detailed_record["experiment_alias"] = expt_ref.get("@refname", "")
# detailed_record["run_total_bases"] = run_set["@total_bases"]
# detailed_record["run_total_spots"] = run_set["@total_spots"]
for sample_attribute in sample_attributes:
dict_values = list(sample_attribute.values())
if len(dict_values) > 1:
detailed_record[dict_values[0]] = dict_values[1]
else:
# TODO: Investigate why these fields have just the key
# but no value
pass
detailed_records.append(detailed_record)
detailed_record_df = pd.DataFrame(detailed_records).drop_duplicates()
if (
"run_accession" in metadata_df.keys()
and "run_accession" in detailed_record_df.keys()
):
metadata_df = metadata_df.merge(
detailed_record_df, on="run_accession", how="outer"
)
elif "experiment_accession" in detailed_record_df.keys():
metadata_df = metadata_df.merge(
detailed_record_df, on="experiment_accession", how="outer"
)
metadata_df = metadata_df[metadata_df.columns.dropna()]
metadata_df = metadata_df.drop_duplicates()
metadata_df = metadata_df.replace(r"^\s*$", np.nan, regex=True)
ena_cols = [
"ena_fastq_http",
"ena_fastq_http_1",
"ena_fastq_http_2",
"ena_fastq_ftp",
"ena_fastq_ftp_1",
"ena_fastq_ftp_2",
]
empty_df = pd.DataFrame(columns=ena_cols)
metadata_df = pd.concat((metadata_df, empty_df), axis=0)
# metadata_df[ena_cols] = np.nan
if "run_accession" in metadata_df.columns:
metadata_df = metadata_df.set_index("run_accession")
# multithreading lookup on ENA, since a lot of time is spent waiting
# for its reply
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# load our function calls into a list of futures
futures = [
executor.submit(self.fetch_ena_fastq, srp)
for srp in metadata_df.study_accession.unique()
]
# now proceed synchronously
for future in concurrent.futures.as_completed(futures):
ena_results = future.result()
if ena_results.shape[0]:
ena_results = ena_results.set_index("run_accession")
metadata_df.update(ena_results)
metadata_df = metadata_df.reset_index()
metadata_df = metadata_df.fillna(pd.NA)
metadata_df.columns = [x.lower().strip() for x in metadata_df.columns]
if "run_accession" in metadata_df.columns:
return metadata_df.sort_values(by="run_accession")
return metadata_df
[docs] def fetch_gds_results(self, gse, **kwargs):
result = self.get_esummary_response("geo", gse)
try:
uids = result["uids"]
except KeyError:
print("No results found for {} | Obtained result: {}".format(gse, result))
return None
gse_records = []
for uid in uids:
record = result[uid]
del record["uid"]
if record["extrelations"]:
extrelations = record["extrelations"]
for extrelation in extrelations:
keys = list(extrelation.keys())
values = list(extrelation.values())
assert sorted(keys) == sorted(
["relationtype", "targetobject", "targetftplink"]
)
assert len(values) == 3
record[extrelation["relationtype"]] = extrelation["targetobject"]
del record["extrelations"]
gse_records.append(record)
if not len(gse_records):
print("No results found for {}".format(gse))
return None
return pd.DataFrame(gse_records)
[docs] def gse_to_gsm(self, gse, **kwargs):
if isinstance(gse, str):
gse = [gse]
gse_df = self.fetch_gds_results(gse, **kwargs)
gse_df = gse_df.rename(
columns={
"accession": "experiment_alias",
"SRA": "experiment_accession",
"title": "experiment_title",
"summary": "sample_attribute",
}
)
# TODO: Fix for multiple GSEs?
gse_df["study_alias"] = ""
if len(gse) == 1:
study_alias = gse[0]
for index, row in gse_df.iterrows():
if row.entrytype == "GSE":
study_alias = row["experiment_accession"]
# If GSM is ecnountered, apply it the
# previously encountered GSE
elif row.entrytype == "GSM":
gse_df.loc[index, "study_alias"] = study_alias
gse_df = gse_df[gse_df.entrytype == "GSM"]
if kwargs and kwargs["detailed"] == True:
return gse_df
return gse_df[
["study_alias", "experiment_alias", "experiment_accession"]
].drop_duplicates()
[docs] def gse_to_srp(self, gse, **kwargs):
if isinstance(gse, str):
gse = [gse]
gse_df = self.fetch_gds_results(gse, **kwargs)
gse_df = gse_df.rename(
columns={"accession": "study_alias", "SRA": "study_accession"}
)
gse_df_subset = None
if "GSE" in gse_df.entrytype.unique():
gse_df_subset = gse_df[gse_df.entrytype == "GSE"]
common_gses = set(gse_df.study_alias.unique()).intersection(gse)
if len(common_gses) < len(gse):
gse_df_subset = None
if gse_df_subset is None:
# sometimes SRX ids ar ereturned instead of an entire project
# see https://github.com/saketkc/pysradb/issues/186
# GSE: GSE209835; SRP =SRP388275
gse_df_subset_gse = gse_df[gse_df.entrytype == "GSE"]
gse_of_interest = list(set(gse).difference(gse_df.study_alias.unique()))
gse_df_subset_other = gse_df[gse_df.entrytype != "GSE"]
srx = gse_df_subset_other.study_accession.tolist()
srp_df = self.srx_to_srp(srx)
srp_unique = list(
set(srp_df.study_accession.unique()).difference(
gse_df_subset_gse.study_accession.tolist()
)
)
new_gse_df = pd.DataFrame(
{"study_alias": gse_of_interest, "study_accession": srp_unique}
)
gse_df_subset = pd.concat([gse_df_subset_gse, new_gse_df])
gse_df_subset = gse_df_subset.loc[gse_df_subset.study_alias.isin(gse)]
return gse_df_subset[["study_alias", "study_accession"]].drop_duplicates()
[docs] def gsm_to_srp(self, gsm, **kwargs):
gsm_df = self.fetch_gds_results(gsm, **kwargs)
gsm_df = gsm_df[gsm_df.entrytype == "GSE"]
gsm_df = gsm_df.rename(
columns={"accession": "experiment_alias", "SRA": "study_accession"}
)
return gsm_df[["experiment_alias", "study_accession"]].drop_duplicates()
[docs] def gsm_to_srr(self, gsm, **kwargs):
gsm_df = self.fetch_gds_results(gsm, **kwargs)
gsm_df = gsm_df.rename(
columns={
"accession": "experiment_alias",
"SRA": "experiment_accession",
"title": "experiment_title",
"summary": "sample_attribute",
}
)
gsm_df = gsm_df[gsm_df.entrytype == "GSM"]
srr_df = self.srx_to_srr(gsm_df.experiment_accession.tolist())
gsm_df = gsm_df.merge(srr_df, on="experiment_accession")
return gsm_df[["experiment_alias", "run_accession"]]
[docs] def gsm_to_srs(self, gsm, **kwargs):
"""Get SRS for a GSM"""
gsm_df = self.fetch_gds_results(gsm, **kwargs)
gsm_df = gsm_df[gsm_df.entrytype == "GSM"].rename(
columns={"SRA": "experiment_accession", "accession": "experiment_alias"}
)
srx = gsm_df.experiment_accession.tolist()
time.sleep(self.sleep_time)
srs_df = self.srx_to_srs(srx)
gsm_df = srs_df.merge(gsm_df, on="experiment_accession")[
["experiment_alias", "sample_accession"]
]
return gsm_df.drop_duplicates()
[docs] def gsm_to_srx(self, gsm, **kwargs):
"""Get SRX for a GSM"""
if isinstance(gsm, str):
gsm = [gsm]
gsm_df = self.fetch_gds_results(gsm, **kwargs)
gsm_df = gsm_df[gsm_df.entrytype == "GSM"].rename(
columns={"SRA": "experiment_accession", "accession": "experiment_alias"}
)
gsm_df = gsm_df.loc[gsm_df["experiment_alias"].isin(gsm)]
return gsm_df[["experiment_alias", "experiment_accession"]].drop_duplicates()
[docs] def gsm_to_gse(self, gsm, **kwargs):
gsm_df = self.fetch_gds_results(gsm, **kwargs)
gsm_df = gsm_df[gsm_df.entrytype == "GSE"]
gsm_df = gsm_df.rename(
columns={"accession": "study_alias", "SRA": "study_accession"}
)
return gsm_df[["study_alias", "study_accession"]]
[docs] def srp_to_gse(self, srp, **kwargs):
"""Get GSE for a SRP"""
srp_df = self.fetch_gds_results(srp, **kwargs)
if srp_df is None:
srp_df = pd.DataFrame(
{"study_alias": [], "study_accession": [], "entrytype": []}
)
srp_df = srp_df.rename(
columns={"accession": "study_alias", "SRA": "study_accession"}
)
srp_df_gse = srp_df[srp_df.entrytype == "GSE"]
missing_srp = list(set(srp).difference(srp_df_gse.study_accession.tolist()))
srp_df_nongse = srp_df[srp_df.entrytype != "GSE"]
if srp_df_nongse.shape[0] >= 1:
srp_df_nongse = pd.DataFrame(
{
"study_accession": missing_srp,
"study_alias": [pd.NA] * len(missing_srp),
"entrytpe": ["GSE"] * len(missing_srp),
}
)
srp_df = pd.concat([srp_df_gse, srp_df_nongse])
return srp_df[["study_accession", "study_alias"]].drop_duplicates()
[docs] def srp_to_srr(self, srp, **kwargs):
"""Get SRR for a SRP"""
srp_df = self.sra_metadata(srp, **kwargs)
return _order_first(srp_df, ["study_accession", "run_accession"])
[docs] def srp_to_srs(self, srp, **kwargs):
"""Get SRS for a SRP"""
srp_df = self.sra_metadata(srp, **kwargs)
return _order_first(srp_df, ["study_accession", "sample_accession"])
[docs] def srp_to_srx(self, srp, **kwargs):
"""Get SRX for a SRP"""
srp_df = self.sra_metadata(srp, **kwargs)
srp_df["study_accesssion"] = srp
return _order_first(srp_df, ["study_accession", "experiment_accession"])
[docs] def srr_to_gsm(self, srr, **kwargs):
"""Get GSM for a SRR"""
if isinstance(srr, str):
srr = [srr]
srr_df = self.srr_to_srp(srr, detailed=True)
# remove NAs
srp = [x for x in srr_df.study_accession.tolist() if not x is pd.NA]
gse_df = self.fetch_gds_results(srp, **kwargs)
gse_df = gse_df[gse_df.entrytype == "GSE"].rename(
columns={"SRA": "project_accession", "accession": "project_alias"}
)
gsm_df = self.gse_to_gsm(gse_df.project_alias.tolist(), detailed=True)
srr_cols = list(
set(srr_df.columns.tolist()).difference(gsm_df.columns.tolist())
) + ["experiment_accession"]
joined_df = gsm_df.merge(srr_df[srr_cols], on="experiment_accession")
df = _order_first(joined_df, ["run_accession", "experiment_alias"])
df = df.loc[df["run_accession"].isin(srr)]
return df
[docs] def srr_to_srp(self, srr, **kwargs):
"""Get SRP for a SRR"""
if isinstance(srr, str):
srr = [srr]
srr_df = self.sra_metadata(srr, **kwargs)
if kwargs and kwargs["detailed"] == True:
return srr_df
srr_df = srr_df.loc[srr_df["run_accession"].isin(srr)]
return _order_first(srr_df, ["run_accession", "study_accession"])
[docs] def srr_to_srs(self, srr, **kwargs):
"""Get SRS for a SRR"""
if isinstance(srr, str):
srr = [srr]
srr_df = self.sra_metadata(srr, **kwargs)
srr_df = srr_df.loc[srr_df["run_accession"].isin(srr)]
return _order_first(srr_df, ["run_accession", "sample_accession"])
[docs] def srr_to_srx(self, srr, **kwargs):
"""Get SRX for a SRR"""
if isinstance(srr, str):
srr = [srr]
srr_df = self.sra_metadata(srr)
srr_df = srr_df.loc[srr_df["run_accession"].isin(srr)]
return _order_first(srr_df, ["run_accession", "experiment_accession"])
[docs] def srs_to_gsm(self, srs, **kwargs):
"""Get GSM for a SRS"""
if isinstance(srs, str):
srs = [srs]
srx_df = self.srs_to_srx(srs)
time.sleep(self.sleep_time)
gsm_df = self.srx_to_gsm(srx_df.experiment_accession.tolist(), **kwargs)
srs_df = srx_df.merge(gsm_df, on="experiment_accession")
srs_df = srs_df.loc[srs_df["sample_accession"].isin(srs)]
return _order_first(srs_df, ["sample_accession", "experiment_alias"])
[docs] def srx_to_gsm(self, srx, **kwargs):
if isinstance(srx, str):
srx = [srx]
gsm_df = self.fetch_gds_results(srx, **kwargs)
gsm_df = gsm_df[gsm_df.entrytype == "GSM"].rename(
columns={"SRA": "experiment_accession", "accession": "experiment_alias"}
)
gsm_df = gsm_df.loc[gsm_df["experiment_accession"].isin(srx)]
return gsm_df[["experiment_accession", "experiment_alias"]].drop_duplicates()
[docs] def srs_to_srx(self, srs, **kwargs):
"""Get SRX for a SRS"""
srs_df = self.sra_metadata(srs, **kwargs)
return _order_first(srs_df, ["sample_accession", "experiment_accession"])
[docs] def srx_to_srp(self, srx, **kwargs):
"""Get SRP for a SRX"""
srx_df = self.sra_metadata(srx, **kwargs)
return _order_first(srx_df, ["experiment_accession", "study_accession"])
[docs] def srx_to_srr(self, srx, **kwargs):
"""Get SRR for a SRX"""
srx_df = self.sra_metadata(srx, **kwargs)
return _order_first(srx_df, ["experiment_accession", "run_accession"])
[docs] def srx_to_srs(self, srx, **kwargs):
"""Get SRS for a SRX"""
srx_df = self.sra_metadata(srx, **kwargs)
return _order_first(srx_df, ["experiment_accession", "sample_accession"])
[docs] def search(self, *args, **kwargs):
raise NotImplementedError("Search not yet implemented for Web")