Source code for pysradb.sraweb
"""Utilities to interact with SRA online"""
import concurrent.futures
import os
import re
import sys
import time
import warnings
from collections import OrderedDict
from json.decoder import JSONDecodeError
from xml.parsers.expat import ExpatError
import numpy as np
import pandas as pd
import requests
import xmltodict
warnings.simplefilter(action="ignore", category=FutureWarning)
from xml.sax.saxutils import escape
def _make_hashable(obj):
"""Convert unhashable types to hashable ones for pandas operations"""
if isinstance(obj, (OrderedDict, dict)):
# Extract text content from XML parsed dict/OrderedDict
if "#text" in obj:
return obj["#text"] # Extract the actual text content
elif len(obj) == 1 and "@xmlns" in obj:
return pd.NA # Handle xmlns-only dicts as missing data
else:
# Fallback to string representation for other dict structures
return str(obj)
elif isinstance(obj, list):
# Convert list to tuple
return tuple(_make_hashable(item) for item in obj)
else:
return obj
def _order_first(df, column_order_list):
columns = column_order_list + [
col for col in df.columns.tolist() if col not in column_order_list
]
# check if all columns do exist in the dataframe
if len(set(columns).intersection(df.columns)) == len(columns):
df = df.loc[:, columns]
df = df.mask(df.map(str).eq("[]"))
# Filter out XML namespace artifacts
df = df.replace(regex=r"^@xmlns.*", value=pd.NA).infer_objects(copy=False)
df = df.fillna(pd.NA)
return df
def _retry_response(base_url, payload, key, max_retries=10):
"""Rerty fetching esummary if API rate limit exceeeds"""
for index, _ in enumerate(range(max_retries)):
try:
request = requests.get(base_url, params=OrderedDict(payload))
response = request.json()
results = response[key]
return response
except KeyError:
# sleep for increasing times
time.sleep(index + 1)
continue
raise RuntimeError("Failed to fetch esummary. API rate limit exceeded.")
[docs]
def get_retmax(n_records, retmax=500):
"""Get retstart and retmax till n_records are exhausted"""
for i in range(0, n_records, retmax):
yield i
[docs]
class SRAweb(object):
def __init__(self, api_key=None):
"""
Initialize SRAweb for API-based access to SRA data.
Parameters
----------
api_key: string
API key for ncbi eutils. Optional, but recommended for higher rate limits.
"""
self.base_url = dict()
self.base_url["esummary"] = (
"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
)
self.base_url["esearch"] = (
"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi"
)
self.base_url["efetch"] = (
"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
)
self.ena_fastq_search_url = "https://www.ebi.ac.uk/ena/portal/api/filereport"
self.ena_params = [("result", "read_run"), ("fields", "fastq_ftp")]
self.esearch_params = {}
self.esearch_params["sra"] = [
("db", "sra"),
("usehistory", "n"),
("retmode", "json"),
]
self.esearch_params["geo"] = [
("db", "gds"),
("usehistory", "n"),
("retmode", "json"),
]
self.efetch_params = [
("db", "sra"),
("usehistory", "n"),
("retmode", "runinfo"),
]
if api_key is not None:
self.esearch_params["sra"].append(("api_key", str(api_key)))
self.esearch_params["geo"].append(("api_key", str(api_key)))
self.efetch_params.append(("api_key", str(api_key)))
self.sleep_time = 1 / 10
else:
self.sleep_time = 1 / 3
[docs]
@staticmethod
def format_xml(string):
"""Create a fake root to make 'string' a valid xml
Parameters
----------
string: str
Returns
--------
xml: str
"""
# string = unescape(string.strip())
string = string.strip()
return "<root>" + string + "</root>"
[docs]
@staticmethod
def xml_to_json(xml):
"""Convert xml to json.
Parameters
----------
xml: str
Input XML
Returns
-------
xml_dict: dict
Parsed xml as dict
"""
try:
xmldict = xmltodict.parse(
xml, process_namespaces=False, dict_constructor=OrderedDict
)
json = xmldict["root"]
except ExpatError:
raise RuntimeError("Unable to parse xml: {}".format(xml))
return json
[docs]
def bioproject_to_srp(self, bioproject):
"""Convert PRJNA BioProject ID to SRP accession
Parameters
----------
bioproject: str
BioProject ID (e.g., 'PRJNA810439')
Returns
-------
srp_accessions: list
List of SRP accessions found
"""
if not bioproject or pd.isna(bioproject):
return []
try:
import re
# Search SRA for records with this bioproject
search_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi"
search_params = {
"db": "sra",
"term": f"{bioproject}[BioProject]",
"retmode": "json",
"retmax": "50",
}
response = requests.get(search_url, params=search_params, timeout=30)
result = response.json()
sra_uids = result.get("esearchresult", {}).get("idlist", [])
if not sra_uids:
return []
# Get summaries to extract SRP accessions
summary_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
srp_set = set()
# Process in batches to avoid too many requests
for uid in sra_uids[:10]: # Limit to first 10
try:
summary_params = {"db": "sra", "id": uid, "retmode": "json"}
summary_response = requests.get(
summary_url, params=summary_params, timeout=30
)
summary_result = summary_response.json()
if uid in summary_result.get("result", {}):
record = summary_result["result"][uid]
expxml = record.get("expxml", "")
# Extract SRP using regex from the XML
srp_match = re.search(r'Study acc="(SRP\d+)"', expxml)
if srp_match:
srp_set.add(srp_match.group(1))
time.sleep(0.1) # Small delay between requests
except Exception:
continue
return sorted(list(srp_set))
except Exception as e:
return []
[docs]
def fetch_ena_fastq(self, srp):
"""Fetch FASTQ records from ENA (EXPERIMENTAL)
Parameters
----------
srp: string
Srudy accession
Returns
-------
srr_url: list
List of SRR fastq urls
"""
payload = self.ena_params.copy()
payload += [("accession", srp)]
request = requests.get(self.ena_fastq_search_url, params=OrderedDict(payload))
request_text = request.text.strip()
urls = []
for line in request_text.split("\n"):
if "fastq_ftp" in line:
continue
line = line.strip()
line_split = line.split("\t")
if len(line_split) != 2:
continue
url, srr = line.split("\t")
# sometimes this needs to be flipped
if "sra.ebi.ac.uk" in srr:
url, srr = srr, url
http_url = "http://{}".format(url)
ftp_url = url.replace("ftp.sra.ebi.ac.uk/", "era-fasp@fasp.sra.ebi.ac.uk:")
urls += [(srr, http_url, ftp_url)]
# Paired end case
def _handle_url_split(url_split):
url1_1 = pd.NA
url1_2 = pd.NA
for url_temp in url_split:
if "_1.fastq.gz" in url_temp:
url1_1 = url_temp
elif "_2.fastq.gz" in url_temp:
url1_2 = url_temp
return url1_1, url1_2
if ";" in request_text:
urls_expanded = []
for srr, url1, url2 in urls:
# strip _1, _2
srr = srr.split("_")[0]
if ";" in url1:
url1_split = url1.split(";")
if len(url1_split) == 2:
url1_1, url1_2 = url1_split
else:
# warnings.warn('ignoring extra urls found for paired end accession')
url1_1, url1_2 = _handle_url_split(url1_split)
url1_2 = "http://{}".format(url1_2)
url2_split = url2.split(";")
if len(url2_split) == 2:
url2_1, url2_2 = url2_split
else:
# warnings.warn('ignoring extra urls found for paired end accession')
url2_1, url2_2 = _handle_url_split(url2_split)
else:
url1_1 = url1
url2_1 = url2
url1_2 = ""
url2_2 = ""
urls_expanded.append((srr, url1_1, url1_2, url2_1, url2_2))
df = pd.DataFrame(
urls_expanded,
columns=[
"run_accession",
"ena_fastq_http_1",
"ena_fastq_http_2",
"ena_fastq_ftp_1",
"ena_fastq_ftp_2",
],
).sort_values(by="run_accession")
return df
else:
return pd.DataFrame(
urls, columns=["run_accession", "ena_fastq_http", "ena_fastq_ftp"]
).sort_values(by="run_accession")
[docs]
def create_esummary_params(self, esearchresult, db="sra"):
query_key = esearchresult["querykey"]
webenv = esearchresult["webenv"]
retstart = esearchresult["retstart"]
# TODO this should be adaptive to build
# upon using the 'count' result in esearch result,
# Currently only supports a max of 500 records.
# retmax = esearchresult["retmax"]
retmax = 500
return [
("query_key", query_key),
("WebEnv", webenv),
("retstart", retstart),
("retmax", retmax),
]
[docs]
def get_esummary_response(self, db, term, usehistory="y"):
assert db in ["sra", "geo"]
payload = self.esearch_params[db].copy()
if isinstance(term, list):
term = " OR ".join(term)
payload += [("term", term)]
request = requests.post(self.base_url["esearch"], data=OrderedDict(payload))
try:
esearch_response = request.json()
except JSONDecodeError:
sys.stderr.write(
"Unable to parse esummary response json: {}{}. Will retry once.".format(
request.text, os.linesep
)
)
retry_after = request.headers.get("Retry-After", 1)
time.sleep(int(retry_after))
request = requests.post(self.base_url["esearch"], data=OrderedDict(payload))
try:
esearch_response = request.json()
except JSONDecodeError as e:
error_msg = (
"Unable to parse esummary response json: {}{}. Aborting.".format(
request.text, os.linesep
)
)
sys.stderr.write(error_msg)
raise ValueError(error_msg) from e
# retry again
if "esummaryresult" in esearch_response:
print("No result found")
return
if "error" in esearch_response:
# API rate limite exceeded
esearch_response = _retry_response(
self.base_url["esearch"], payload, "esearchresult"
)
n_records = int(esearch_response["esearchresult"]["count"])
results = {}
for retstart in get_retmax(n_records):
payload = self.esearch_params[db].copy()
payload += self.create_esummary_params(esearch_response["esearchresult"])
payload = OrderedDict(payload)
payload["retstart"] = retstart
request = requests.get(
self.base_url["esummary"], params=OrderedDict(payload)
)
try:
response = request.json()
except JSONDecodeError:
time.sleep(1)
response = _retry_response(self.base_url["esummary"], payload, "result")
if "error" in response:
# API rate limite exceeded
response = _retry_response(self.base_url["esummary"], payload, "result")
if retstart == 0:
results = response["result"]
else:
result = response["result"]
for key, value in result.items():
if key in list(results.keys()):
results[key] += value
else:
results[key] = value
return results
[docs]
def get_efetch_response(self, db, term, usehistory="y"):
assert db in ["sra", "geo"]
payload = self.esearch_params[db].copy()
if isinstance(term, list):
term = " OR ".join(term)
payload += [("term", term)]
request = requests.get(self.base_url["esearch"], params=OrderedDict(payload))
esearch_response = request.json()
if "esummaryresult" in esearch_response:
print("No result found")
return
if "error" in esearch_response:
# API rate limite exceeded
esearch_response = _retry_response(
self.base_url["esearch"], payload, "esearchresult"
)
n_records = int(esearch_response["esearchresult"]["count"])
results = {}
for retstart in get_retmax(n_records):
payload = self.efetch_params.copy()
payload += self.create_esummary_params(esearch_response["esearchresult"])
payload = OrderedDict(payload)
payload["retstart"] = retstart
request = requests.get(self.base_url["efetch"], params=OrderedDict(payload))
request_text = request.text.strip()
try:
request_json = request.json()
except:
request_json = {} # eval(request_text)
if "error" in request_json:
# print("Encountered: {}".format(request_json))
# print("Headers: {}".format(request.headers))
# Handle API-rate limit exceeding
try:
retry_after = request.headers["Retry-After"]
except KeyError:
if request_json["error"] == "error forwarding request":
error_msg = "Encountered error while making request.\n"
sys.stderr.write(error_msg)
raise RuntimeError(error_msg.strip())
time.sleep(int(retry_after))
# try again
request = requests.get(
self.base_url["efetch"], params=OrderedDict(payload)
)
request_text = request.text.strip()
try:
request_json = request.json()
if request_json["error"] == "error forwarding request":
sys.stderr.write("Encountered error while making request.\n")
return
except:
request_json = {} # eval(request_text)
try:
xml_response = xmltodict.parse(
request_text, process_namespaces=False, dict_constructor=OrderedDict
)
exp_response = xml_response.get("EXPERIMENT_PACKAGE_SET", {})
response = exp_response.get("EXPERIMENT_PACKAGE", {})
except ExpatError as e:
error_msg = "Unable to parse xml: {}{}".format(request_text, os.linesep)
sys.stderr.write(error_msg)
raise ValueError(error_msg.strip()) from e
if not response:
error_msg = "Unable to parse xml response. Received: {}{}".format(
xml_response, os.linesep
)
sys.stderr.write(error_msg)
raise ValueError(error_msg.strip())
if retstart == 0:
results = response
else:
result = response
for value in result:
results.append(value)
time.sleep(self.sleep_time)
return results
[docs]
def sra_metadata(
self,
srp,
sample_attribute=False,
detailed=False,
expand_sample_attributes=False,
output_read_lengths=False,
include_pmids=False,
enrich=False,
enrich_backend="ollama/phi3",
**kwargs,
):
esummary_result = self.get_esummary_response("sra", srp)
try:
uids = esummary_result["uids"]
except KeyError:
return None
exps_xml = OrderedDict()
runs_xml = OrderedDict()
exps_json = OrderedDict()
runs_json = OrderedDict()
for uid in uids:
exps_xml[uid] = self.format_xml(esummary_result[uid]["expxml"])
runs_xml[uid] = self.format_xml(esummary_result[uid]["runs"])
for uid in uids:
exps_json[uid] = self.xml_to_json(exps_xml[uid])
runs_json[uid] = self.xml_to_json(runs_xml[uid])
sra_record = []
for uid, run_json in runs_json.items():
exp_json = exps_json[uid]
exp_summary = exp_json["Summary"]
exp_title = exp_summary.get("Title", pd.NA)
exp_platform = exp_summary.get("Platform", {})
statistics = exp_summary.get("Statistics", {})
if isinstance(exp_platform, OrderedDict):
exp_platform_model = exp_platform.get("@instrument_model", pd.NA)
exp_platform_desc = exp_platform.get("#text", pd.NA)
else:
exp_platform_model = pd.NA
exp_platform_desc = pd.NA
exp_total_runs = statistics.get("@total_runs", pd.NA)
exp_total_spots = statistics.get("@total_spots", pd.NA)
exp_total_size = statistics.get("@total_size", pd.NA)
# experiment_accession
exp_ID = exp_json["Experiment"]["@acc"]
# experiment_title
exp_name = exp_json["Experiment"]["@name"]
exp_organism = exp_json.get("Organism", pd.NA)
exp_organism_name = pd.NA
exp_taxid = pd.NA
if isinstance(exp_organism, dict):
exp_organism_name = exp_organism.get("@ScientificName", pd.NA)
exp_taxid = exp_organism["@taxid"]
exp_instrument = list(exp_json["Instrument"].values())[0]
exp_sample = exp_json["Sample"]
# sample_accession
exp_sample_ID = exp_sample["@acc"]
# sample_title
exp_sample_name = exp_sample["@name"]
exp_library_descriptor = exp_json["Library_descriptor"]
# library_strategy
exp_library_strategy = exp_library_descriptor["LIBRARY_STRATEGY"]
if isinstance(exp_library_strategy, dict):
exp_library_strategy = exp_library_strategy["#text"]
# library_source
exp_library_source = exp_library_descriptor["LIBRARY_SOURCE"]
if isinstance(exp_library_source, dict):
exp_library_source = exp_library_source["#text"]
# library_selection
exp_library_selection = exp_library_descriptor["LIBRARY_SELECTION"]
if isinstance(exp_library_selection, dict):
exp_library_selection = exp_library_selection["#text"]
# library_name
exp_library_name = exp_library_descriptor.get("LIBRARY_NAME", "")
if isinstance(exp_library_selection, dict):
exp_library_name = exp_library_name["#text"]
# library_layout
exp_library_layout = list(exp_library_descriptor["LIBRARY_LAYOUT"].keys())[
0
]
# biosample
exp_biosample = exp_json.get("Biosample", pd.NA)
# bioproject
exp_bioproject = exp_json.get("Bioproject", pd.NA)
experiment_record = OrderedDict()
experiment_record["study_accession"] = exp_json["Study"]["@acc"]
experiment_record["study_title"] = exp_json["Study"]["@name"]
experiment_record["experiment_accession"] = exp_ID
experiment_record["experiment_title"] = exp_name
experiment_record["experiment_desc"] = exp_title
experiment_record["organism_taxid"] = exp_taxid
experiment_record["organism_name"] = exp_organism_name
experiment_record["library_name"] = exp_library_name
experiment_record["library_strategy"] = exp_library_strategy
experiment_record["library_source"] = exp_library_source
experiment_record["library_selection"] = exp_library_selection
experiment_record["library_layout"] = exp_library_layout
experiment_record["sample_accession"] = exp_sample_ID
experiment_record["sample_title"] = exp_sample_name
experiment_record["biosample"] = exp_biosample
experiment_record["bioproject"] = exp_bioproject
experiment_record["instrument"] = exp_instrument
experiment_record["instrument_model"] = exp_platform_model
experiment_record["instrument_model_desc"] = exp_platform_desc
experiment_record["total_spots"] = exp_total_spots
experiment_record["total_size"] = exp_total_size
if not run_json:
# Sometimes the run_accession is not populated by NCBI:
# df2 = self.srx_to_srr(exp_ID)
# extra_fields = set(experiment_record.keys()).difference(df2.columns.tolist())
# for idx, row in df2.iterrows():
# for field in extra_fields:
# experiment_record[field] = row[field]
sra_record.append(experiment_record)
continue
runs = run_json["Run"]
if not isinstance(runs, list):
runs = [runs]
for run_record in runs:
run_accession = run_record["@acc"]
run_total_spots = run_record["@total_spots"]
run_total_bases = run_record["@total_bases"]
experiment_record["run_accession"] = run_accession
experiment_record["run_total_spots"] = run_total_spots
experiment_record["run_total_bases"] = run_total_bases
sra_record.append(experiment_record.copy())
# TODO: the detailed call below does redundant operations
# the code above this can be completeley done away with
# Convert any unhashable types to hashable ones before creating DataFrame
hashable_records = []
for record in sra_record:
hashable_record = {k: _make_hashable(v) for k, v in record.items()}
hashable_records.append(hashable_record)
metadata_df = pd.DataFrame(hashable_records).drop_duplicates()
if "run_accession" in metadata_df.columns:
metadata_df = metadata_df.sort_values(by="run_accession")
metadata_df.columns = [x.lower().strip() for x in metadata_df.columns]
# Filter out XML namespace artifacts and replace with NA
metadata_df = metadata_df.replace(
regex=r"^@xmlns.*", value=pd.NA
).infer_objects(copy=False)
if not detailed:
return metadata_df
time.sleep(self.sleep_time)
efetch_result = self.get_efetch_response("sra", srp)
if not isinstance(efetch_result, list):
if efetch_result:
efetch_result = [efetch_result]
else:
return None
detailed_records = []
for record in efetch_result:
if "SAMPLE" in record.keys() and "SAMPLE_ATTRIBUTES" in record["SAMPLE"]:
sample_attributes = record["SAMPLE"]["SAMPLE_ATTRIBUTES"][
"SAMPLE_ATTRIBUTE"
]
else:
sample_attributes = []
if isinstance(sample_attributes, OrderedDict):
sample_attributes = [sample_attributes]
exp_record = record["EXPERIMENT"]
exp_attributes = exp_record.get("EXPERIMENT_ATTRIBUTES", {})
run_sets = record["RUN_SET"].get("RUN", [])
if not isinstance(run_sets, list):
run_sets = [run_sets]
for run_set in run_sets:
detailed_record = OrderedDict()
if not run_json:
# Add experiment accession if no run info found earlier
detailed_record["experiment_accession"] = exp_record["@accession"]
# detailed_record["experiment_title"] = exp_record["TITLE"]
for key, values in exp_attributes.items():
key = key.lower()
for value_x in values:
if not isinstance(value_x, dict):
continue
tag = value_x["TAG"].lower()
value = value_x["VALUE"] if "VALUE" in value_x else None
detailed_record[tag] = value
lib_record = exp_record["DESIGN"]["LIBRARY_DESCRIPTOR"]
for key, value in lib_record.items():
key = key.lower()
if key == "library_layout":
value = list(value.keys())[0]
elif key == "library_construction_protocol":
continue
# detailed_record[key] = value
detailed_record["run_accession"] = run_set["@accession"]
detailed_record["run_alias"] = run_set["@alias"]
sra_files = run_set.get("SRAFiles", {})
sra_files = sra_files.get("SRAFile", {})
if isinstance(sra_files, OrderedDict):
# detailed_record["sra_url"] = sra_files.get("@url", pd.NA)
if "Alternatives" in sra_files.keys():
alternatives = sra_files["Alternatives"]
if not isinstance(alternatives, list):
alternatives = [alternatives]
for alternative in alternatives:
org = alternative["@org"].lower()
for key in alternative.keys():
if key == "@org":
continue
detailed_record[
"{}_{}".format(org, key.replace("@", ""))
] = alternative[key]
else:
for sra_file in sra_files:
# Multiple download URLs
# Use the one where the download filename corresponds to the SRR
cluster = sra_file.get("@cluster", None).lower().strip()
if cluster is None:
continue
for key in sra_file.keys():
if key == "@cluster":
continue
if key == "Alternatives":
# Example: SRP184142
alternatives = sra_file["Alternatives"]
if not isinstance(alternatives, list):
alternatives = [alternatives]
for alternative in alternatives:
org = alternative["@org"].lower()
for key in alternative.keys():
if key == "@org":
continue
detailed_record[
"{}_{}".format(org, key.replace("@", ""))
] = alternative[key]
else:
detailed_record[
"{}_{}".format(cluster, key.replace("@", ""))
] = sra_file[key]
expt_ref = run_set["EXPERIMENT_REF"]
detailed_record["experiment_alias"] = expt_ref.get("@refname", "")
# detailed_record["run_total_bases"] = run_set["@total_bases"]
# detailed_record["run_total_spots"] = run_set["@total_spots"]
for sample_attribute in sample_attributes:
dict_values = list(sample_attribute.values())
if len(dict_values) > 1:
detailed_record[dict_values[0]] = dict_values[1]
else:
# TODO: Investigate why these fields have just the key
# but no value
pass
detailed_records.append(detailed_record)
detailed_record_df = pd.DataFrame(detailed_records).drop_duplicates()
if (
"run_accession" in metadata_df.keys()
and "run_accession" in detailed_record_df.keys()
):
metadata_df = metadata_df.merge(
detailed_record_df, on="run_accession", how="outer"
)
elif "experiment_accession" in detailed_record_df.keys():
metadata_df = metadata_df.merge(
detailed_record_df, on="experiment_accession", how="outer"
)
metadata_df = metadata_df[metadata_df.columns.dropna()]
metadata_df = metadata_df.drop_duplicates()
metadata_df = metadata_df.replace(r"^\s*$", pd.NA, regex=True)
ena_cols = [
"ena_fastq_http",
"ena_fastq_http_1",
"ena_fastq_http_2",
"ena_fastq_ftp",
"ena_fastq_ftp_1",
"ena_fastq_ftp_2",
]
empty_df = pd.DataFrame(columns=ena_cols)
metadata_df = pd.concat((metadata_df, empty_df), axis=0)
if "run_accession" in metadata_df.columns:
metadata_df = metadata_df.set_index("run_accession")
# multithreading lookup on ENA, since a lot of time is spent waiting
# for its reply
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# load our function calls into a list of futures
futures = [
executor.submit(self.fetch_ena_fastq, srp)
for srp in metadata_df.study_accession.unique()
]
# now proceed synchronously
for future in concurrent.futures.as_completed(futures):
ena_results = future.result()
if ena_results.shape[0]:
ena_results = ena_results.set_index("run_accession")
metadata_df.update(ena_results)
metadata_df = metadata_df.reset_index()
metadata_df = metadata_df.fillna(pd.NA)
metadata_df.columns = [x.lower().strip() for x in metadata_df.columns]
# Add PMID column when detailed=True and include_pmids=True
if include_pmids:
try:
sra_accessions = [srp] if isinstance(srp, str) else srp
pmid_df = self.sra_to_pmid(sra_accessions)
if pmid_df is not None and not pmid_df.empty:
pmid_map = {}
for _, row in pmid_df.iterrows():
study_acc = row.get("sra_accession", None)
pmid = row.get("pmid")
if not pd.isna(pmid):
if study_acc not in pmid_map:
pmid_map[study_acc] = []
pmid_map[study_acc].append(str(pmid))
metadata_df["pmid"] = metadata_df.apply(
lambda row: ",".join(
pmid_map.get(row.get("study_accession", ""), [""])
),
axis=1,
)
metadata_df["pmid"] = metadata_df["pmid"].replace("", pd.NA)
else:
metadata_df["pmid"] = pd.NA
except Exception as e:
metadata_df["pmid"] = pd.NA
# Filter out XML namespace artifacts and replace with NA
metadata_df = metadata_df.replace(
regex=r"^@xmlns.*", value=pd.NA
).infer_objects(copy=False)
# Add GSE and GSM columns when detailed=True
if detailed:
try:
unique_srps = metadata_df["study_accession"].dropna().unique().tolist()
if unique_srps:
gse_df = self.srp_to_gse(unique_srps)
if gse_df is not None and not gse_df.empty:
srp_to_gse_map = {}
for _, row in gse_df.iterrows():
srp_acc = row.get("study_accession")
gse_acc = row.get("study_alias")
if not pd.isna(srp_acc) and not pd.isna(gse_acc):
if srp_acc not in srp_to_gse_map:
srp_to_gse_map[srp_acc] = []
srp_to_gse_map[srp_acc].append(gse_acc)
metadata_df["study_geo_accession"] = metadata_df[
"study_accession"
].map(
lambda x: (
",".join(srp_to_gse_map.get(x, []))
if x in srp_to_gse_map
else pd.NA
)
)
metadata_df["study_geo_accession"] = metadata_df[
"study_geo_accession"
].replace("", pd.NA)
else:
metadata_df["study_geo_accession"] = pd.NA
else:
metadata_df["study_geo_accession"] = pd.NA
unique_srxs = (
metadata_df["experiment_accession"].dropna().unique().tolist()
)
if unique_srxs:
gsm_response = self.fetch_gds_results(unique_srxs)
if gsm_response is not None and not gsm_response.empty:
srx_to_gsm_map = {}
for _, row in gsm_response.iterrows():
if row.get("entrytype") == "GSM":
gsm_acc = row.get("accession")
srx_acc = row.get("SRA")
if not pd.isna(srx_acc) and not pd.isna(gsm_acc):
srx_to_gsm_map[srx_acc] = gsm_acc
metadata_df["experiment_geo_accession"] = metadata_df[
"experiment_accession"
].map(lambda x: srx_to_gsm_map.get(x, pd.NA))
else:
metadata_df["experiment_geo_accession"] = pd.NA
else:
metadata_df["experiment_geo_accession"] = pd.NA
except Exception as e:
metadata_df["study_geo_accession"] = pd.NA
metadata_df["experiment_geo_accession"] = pd.NA
if enrich and not metadata_df.empty:
try:
from pysradb.metadata_enrichment import create_metadata_extractor
extractor = create_metadata_extractor(
method="llm", backend=enrich_backend
)
metadata_df = extractor.enrich_dataframe(
metadata_df, text_column=None, prefix="guessed_"
)
except Exception as e:
error_msg = str(e)
if "Ollama is not installed or not running" in error_msg:
print(f"Error: {error_msg}")
print(
"Metadata enrichment requires Ollama to be installed and running."
)
print(
"Please install Ollama from https://ollama.ai/ and follow these steps:"
)
print("1. Start Ollama server: ollama serve")
print("2. Pull a model: ollama pull phi3")
print("3. Try again or use a different enrichment backend")
raise
else:
print(f"Warning: Enrichment failed: {e}")
if "run_accession" in metadata_df.columns:
return metadata_df.sort_values(by="run_accession")
return metadata_df
[docs]
def fetch_gds_results(self, gse, **kwargs):
result = self.get_esummary_response("geo", gse)
try:
uids = result["uids"]
except KeyError:
print("No results found for {} | Obtained result: {}".format(gse, result))
return None
gse_records = []
for uid in uids:
record = result[uid]
del record["uid"]
extrelations = record.get("extrelations") or []
if extrelations:
for extrelation in extrelations:
keys = list(extrelation.keys())
values = list(extrelation.values())
assert sorted(keys) == sorted(
["relationtype", "targetobject", "targetftplink"]
)
assert len(values) == 3
record[extrelation["relationtype"]] = extrelation["targetobject"]
del record["extrelations"]
gse_records.append(record)
continue
# Fallback for records without extrelations (e.g., spatial transcriptomics datasets)
samples = record.get("samples") or []
if samples:
record["samples"] = samples
gse_records.append(record)
if not len(gse_records):
print("No results found for {}".format(gse))
return None
return pd.DataFrame(gse_records)
[docs]
def fetch_gsm_soft(self, gsm_ids):
"""
Fetch detailed GSM metadata in SOFT format.
Args:
gsm_ids: List of GSM accessions
Returns:
Dictionary mapping GSM accession to parsed SOFT metadata
"""
if isinstance(gsm_ids, str):
gsm_ids = [gsm_ids]
gsm_data = {}
for gsm in gsm_ids:
try:
url = f"https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi?acc={gsm}&targ=self&form=text&view=full"
response = requests.get(url, timeout=30)
response.raise_for_status()
# Parse SOFT format
soft_text = response.text
metadata = {}
characteristics = []
for line in soft_text.split("\n"):
line = line.strip()
if line.startswith("!Sample_"):
if " = " in line:
key, value = line.split(" = ", 1)
key = key.replace("!Sample_", "").lower()
# Handle characteristics specially - they can appear multiple times
if "characteristics" in key:
characteristics.append(value)
else:
# Store in metadata dict
# Join multiple values with semicolon for consistency
if key not in metadata:
metadata[key] = value
elif isinstance(metadata[key], list):
metadata[key].append(value)
else:
# Convert to list if we have multiple values
metadata[key] = [metadata[key], value]
# Convert list values to semicolon-separated strings
for key, value in metadata.items():
if isinstance(value, list):
metadata[key] = "; ".join(str(v) for v in value)
# Process characteristics into a dict
char_dict = {}
for char in characteristics:
if ": " in char:
char_key, char_val = char.split(": ", 1)
char_dict[char_key.strip().lower()] = char_val.strip()
metadata["characteristics"] = char_dict
gsm_data[gsm] = metadata
except Exception as e:
print(f"Warning: Could not fetch SOFT data for {gsm}: {e}")
gsm_data[gsm] = {}
return gsm_data
[docs]
def geo_metadata(
self,
gse,
sample_attribute=False,
detailed=False,
expand_sample_attributes=False,
include_pmids=False,
enrich=False,
enrich_backend="ollama/phi3",
**kwargs,
):
if isinstance(gse, str):
gse = [gse]
if not gse:
return pd.DataFrame(
columns=[
"study_accession",
"study_title",
"study_summary",
"organism_name",
"platform_accession",
"platform_title",
"experiment_type",
"bioproject",
"submission_date",
"supplementary_files",
"series_ftp",
"sample_accession",
"sample_title",
]
)
geo_response = self.get_esummary_response("geo", gse)
if not geo_response:
return pd.DataFrame(
columns=[
"study_accession",
"study_title",
"study_summary",
"organism_name",
"platform_accession",
"platform_title",
"experiment_type",
"bioproject",
"submission_date",
"supplementary_files",
"series_ftp",
"sample_accession",
"sample_title",
]
)
gse_records = []
sample_accessions = []
for uid in geo_response.get("uids", []):
record = geo_response.get(uid, {})
if record.get("entrytype") != "GSE":
continue
gse_records.append(record)
for sample in record.get("samples") or []:
accession = sample.get("accession")
if accession and accession not in sample_accessions:
sample_accessions.append(accession)
sample_details = {}
gsm_soft_data = {}
if (sample_attribute or detailed) and sample_accessions:
sample_response = self.get_esummary_response("geo", sample_accessions)
if sample_response:
for uid in sample_response.get("uids", []):
entry = sample_response.get(uid, {})
if entry.get("entrytype") == "GSM":
sample_details[entry.get("accession")] = entry
# Fetch full SOFT metadata when detailed=True
if detailed:
gsm_soft_data = self.fetch_gsm_soft(sample_accessions)
# Get SRP mappings for all GSE IDs using gse_to_srp
gse_to_srp_map = {}
if gse and detailed:
try:
srp_df = self.gse_to_srp(gse)
if not srp_df.empty:
# Group by study_alias and join multiple SRPs with comma
for gse_id in gse:
srps = (
srp_df[srp_df["study_alias"] == gse_id]["study_accession"]
.dropna()
.tolist()
)
if srps:
gse_to_srp_map[gse_id] = ",".join(srps)
except Exception:
pass
# Get SRX mappings for all GSM samples
gsm_to_srx_map = {}
if sample_accessions and detailed:
try:
# Use fetch_gds_results to get SRX from GSM entries
gsm_response = self.fetch_gds_results(sample_accessions)
if gsm_response is not None and not gsm_response.empty:
for _, row in gsm_response.iterrows():
if row.get("entrytype") == "GSM":
gsm_acc = row.get("accession")
srx = row.get("SRA")
if gsm_acc and srx and not pd.isna(srx):
gsm_to_srx_map[gsm_acc] = srx
except Exception:
pass
rows = []
for record in gse_records:
study_accession = record.get("accession", pd.NA)
platform = record.get("gpl", pd.NA)
if (
isinstance(platform, str)
and platform.strip()
and not platform.upper().startswith("GPL")
and platform.replace(" ", "").isdigit()
):
platform_accession = f"GPL{platform.strip()}"
else:
platform_accession = platform if platform else pd.NA
base_row = OrderedDict(
[
("study_accession", study_accession),
("study_title", record.get("title", pd.NA)),
("study_summary", record.get("summary", pd.NA)),
("organism_name", record.get("taxon", pd.NA)),
("platform_accession", platform_accession),
("platform_title", record.get("platformtitle", pd.NA)),
("experiment_type", record.get("gdstype", pd.NA)),
("bioproject", record.get("bioproject", pd.NA)),
("submission_date", record.get("pdat", pd.NA)),
("supplementary_files", record.get("suppfile", pd.NA)),
("series_ftp", record.get("ftplink", pd.NA)),
("sample_accession", pd.NA),
("sample_title", pd.NA),
]
)
# Add SRP column when detailed=True
if detailed:
base_row["study_sra_accession"] = gse_to_srp_map.get(
study_accession, pd.NA
)
if sample_attribute:
base_row["sample_summary"] = pd.NA
if detailed:
base_row["sample_ftp"] = pd.NA
base_row["sample_supplementary"] = pd.NA
base_row["sample_geo2r"] = pd.NA
base_row["experiment_sra_accession"] = pd.NA
# Add SOFT metadata fields
base_row["sample_type"] = pd.NA
base_row["sample_source_name"] = pd.NA
base_row["sex"] = pd.NA
base_row["age"] = pd.NA
base_row["tissue"] = pd.NA
base_row["cell_type"] = pd.NA
base_row["disease"] = pd.NA
base_row["treatment"] = pd.NA
base_row["extract_protocol"] = pd.NA
base_row["label_protocol"] = pd.NA
samples = record.get("samples") or []
if samples:
for sample in samples:
row = base_row.copy()
sample_acc = sample.get("accession", pd.NA)
row["sample_accession"] = sample_acc
row["sample_title"] = sample.get("title", pd.NA)
sample_entry = sample_details.get(sample_acc, {})
if sample_attribute:
row["sample_summary"] = sample_entry.get("summary", pd.NA)
if detailed:
row["sample_ftp"] = sample_entry.get("ftplink", pd.NA)
row["sample_supplementary"] = sample_entry.get(
"suppfile", pd.NA
)
row["sample_geo2r"] = sample_entry.get("geo2r", pd.NA)
# Add SRX from gsm_to_srx_map
row["experiment_sra_accession"] = gsm_to_srx_map.get(
sample_acc, pd.NA
)
# Add SOFT metadata - extract ALL available fields
soft_data = gsm_soft_data.get(sample_acc, {})
if soft_data:
# Add all SOFT fields with sample_ prefix (characteristics handled separately)
for soft_key, soft_val in soft_data.items():
if soft_key != "characteristics":
col_name = f"sample_{soft_key}"
row[col_name] = soft_val
# Also extract commonly used fields to canonical column names for convenience
row["sample_type"] = soft_data.get("type_ch1", pd.NA)
row["sample_source_name"] = soft_data.get(
"source_name_ch1", pd.NA
)
row["extract_protocol"] = soft_data.get(
"extract_protocol_ch1", pd.NA
)
row["label_protocol"] = soft_data.get(
"label_protocol_ch1", pd.NA
)
# Process characteristics: add standard ones to canonical names, preserve all as-is
chars = soft_data.get("characteristics", {})
# Standard fields with canonical names (for backward compatibility)
row["sex"] = chars.get("sex", chars.get("gender", pd.NA))
row["age"] = chars.get("age", pd.NA)
row["tissue"] = chars.get(
"tissue",
chars.get(
"tissue type",
chars.get("structures", chars.get("organ", pd.NA)),
),
)
row["cell_type"] = chars.get(
"cell type",
chars.get("cell_type", chars.get("celltype", pd.NA)),
)
row["disease"] = chars.get(
"disease",
chars.get(
"disease state",
chars.get(
"disease_state",
chars.get("disease_status", pd.NA),
),
),
)
row["treatment"] = chars.get(
"treatment",
chars.get("compound", chars.get("drug", pd.NA)),
)
# Add ALL characteristics as-is (including custom ones)
for char_key, char_val in chars.items():
row[char_key] = char_val
rows.append(row)
else:
rows.append(base_row)
if not rows:
return pd.DataFrame(
columns=list(base_row.keys()) if "base_row" in locals() else None
)
metadata_df = pd.DataFrame(rows)
if include_pmids:
try:
pmid_df = self.gse_to_pmid(gse)
except Exception:
pmid_df = None
if pmid_df is not None and not pmid_df.empty:
pmid_map = {}
for _, row in pmid_df.iterrows():
gse_acc = row.get("gse_accession")
pmid = row.get("pmid")
if pd.isna(pmid):
continue
pmid_map.setdefault(gse_acc, []).append(str(pmid))
metadata_df["pmid"] = metadata_df["study_accession"].map(
lambda accession: (
",".join(pmid_map.get(accession, []))
if accession in pmid_map
else pd.NA
)
)
else:
metadata_df["pmid"] = pd.NA
metadata_df = metadata_df.replace("", pd.NA)
if "sample_accession" in metadata_df.columns:
metadata_df = metadata_df.sort_values(
by=["study_accession", "sample_accession"],
na_position="last",
)
metadata_df = metadata_df.reset_index(drop=True)
if detailed and not metadata_df.empty:
try:
gse_bioproject_map = {}
for _, row in metadata_df.iterrows():
gse_id = row.get("study_accession")
bioproject = row.get("bioproject")
if gse_id and str(gse_id).startswith("GSE"):
if bioproject and str(bioproject).startswith("PRJNA"):
gse_bioproject_map[gse_id] = bioproject
gse_srp_map = {}
all_fastq_data = []
for gse_id, bioproject in gse_bioproject_map.items():
try:
srp_list = self.bioproject_to_srp(bioproject)
if srp_list:
srp = srp_list[0] # Use the first SRP found
gse_srp_map[gse_id] = srp
# Fetch SRA metadata and fastq URLs for this SRP
try:
sra_metadata_df = self.sra_metadata(srp)
if not sra_metadata_df.empty:
fastq_df = self.fetch_ena_fastq(srp)
if not fastq_df.empty:
merged_df = sra_metadata_df.merge(
fastq_df, on="run_accession", how="left"
)
merged_df["gse_from_bioproject"] = gse_id
merged_df["srp_from_bioproject"] = srp
all_fastq_data.append(merged_df)
time.sleep(self.sleep_time)
except Exception:
pass
except Exception:
pass
# If we found fastq data, merge it with the main metadata via GSM->SRX->SRR mapping
if all_fastq_data and sample_accessions:
try:
# Combine all fastq data
combined_fastq = pd.concat(all_fastq_data, ignore_index=True)
# Map GSM to SRX for matching
gsm_to_srx_map = {}
for gsm_id in sample_accessions:
try:
srx_df = self.gsm_to_srx(gsm_id)
if (
not srx_df.empty
and "experiment_accession" in srx_df.columns
):
srx_list = (
srx_df["experiment_accession"].dropna().tolist()
)
if srx_list:
gsm_to_srx_map[gsm_id] = srx_list[0]
time.sleep(0.1)
except Exception:
pass
# Add fastq columns to metadata_df
fastq_cols = [
col
for col in combined_fastq.columns
if "fastq" in col.lower() or "ftp" in col.lower()
]
for col in fastq_cols:
if col not in metadata_df.columns:
metadata_df[col] = pd.NA
# Create a mapping from SRX->run_accession->fastq URLs
srx_to_fastq_map = {}
for _, row in combined_fastq.iterrows():
srx = row.get("experiment_accession", pd.NA)
run_acc = row.get("run_accession", pd.NA)
if not pd.isna(srx) and not pd.isna(run_acc):
if srx not in srx_to_fastq_map:
srx_to_fastq_map[srx] = {}
srx_to_fastq_map[srx][run_acc] = row
# Merge fastq URLs based on GSM->SRX->fastq mapping
for col in fastq_cols:
def get_fastq_url(row):
gsm = row.get("sample_accession", pd.NA)
if pd.isna(gsm):
return pd.NA
# Get SRX for this GSM
srx = gsm_to_srx_map.get(gsm, pd.NA)
if pd.isna(srx) or srx not in srx_to_fastq_map:
return pd.NA
# Get first run accession for this SRX
run_accs = list(srx_to_fastq_map[srx].keys())
if run_accs:
fastq_row = srx_to_fastq_map[srx][run_accs[0]]
return fastq_row.get(col, pd.NA)
return pd.NA
metadata_df[col] = metadata_df.apply(get_fastq_url, axis=1)
except Exception:
pass
except Exception:
pass
# Enrich metadata if requested
if enrich and not metadata_df.empty:
try:
from pysradb.metadata_enrichment import create_metadata_extractor
extractor = create_metadata_extractor(
method="llm", backend=enrich_backend
)
metadata_df = extractor.enrich_dataframe(
metadata_df, text_column=None, prefix="guessed_"
)
except Exception as e:
error_msg = str(e)
if "Ollama is not installed or not running" in error_msg:
print(f"Error: {error_msg}")
print(
"Metadata enrichment requires Ollama to be installed and running."
)
print(
"Please install Ollama from https://ollama.ai/ and follow these steps:"
)
print("1. Start Ollama server: ollama serve")
print("2. Pull a model: ollama pull phi3")
print("3. Try again or use a different enrichment backend")
raise
else:
print(f"Warning: Enrichment failed: {e}")
return metadata_df
[docs]
def metadata(self, accession, **kwargs):
"""
Unified method to fetch metadata for SRA or GEO accessions.
Automatically detects accession type and calls appropriate method.
Args:
accession: ``SRP``/``GSE`` accession(s) - can be string or list
kwargs: Additional parameters passed to ``sra_metadata()`` or ``geo_metadata()``
(e.g., ``detailed``, ``enrich``, ``enrich_backend``, ``sample_attribute``, etc.)
Returns:
DataFrame with metadata (enriched if enrich=True)
Examples:
>>> client = SRAweb()
>>> df = client.metadata("GSE286254", detailed=True, enrich=True)
>>> df = client.metadata("SRP253951", detailed=True, enrich=True)
>>> df = client.metadata(["GSE286254", "GSE147507"], enrich=True)
"""
if isinstance(accession, str):
accessions = [accession]
else:
accessions = accession
if not accessions:
return pd.DataFrame()
# Detect accession type from first accession
first_acc = accessions[0].upper()
if first_acc.startswith("GSE"):
return self.geo_metadata(accession, **kwargs)
elif first_acc.startswith("SRP"):
return self.sra_metadata(accession, **kwargs)
else:
raise ValueError(
f"Unsupported accession type: {first_acc}. "
"Supported types: GSE (GEO Series), SRP (SRA Project)"
)
[docs]
def gse_to_gsm(self, gse, **kwargs):
if isinstance(gse, str):
gse = [gse]
gse_df = self.fetch_gds_results(gse, **kwargs)
gse_df = gse_df.rename(
columns={
"accession": "experiment_alias",
"SRA": "experiment_accession",
"title": "experiment_title",
"summary": "sample_attribute",
}
)
# TODO: Fix for multiple GSEs?
gse_df["study_alias"] = ""
if len(gse) == 1:
study_alias = gse[0]
for index, row in gse_df.iterrows():
if row.entrytype == "GSE":
study_alias = row["experiment_accession"]
# If GSM is ecnountered, apply it the
# previously encountered GSE
elif row.entrytype == "GSM":
gse_df.loc[index, "study_alias"] = study_alias
gse_df = gse_df[gse_df.entrytype == "GSM"]
if kwargs and kwargs["detailed"] == True:
return gse_df
return gse_df[
["study_alias", "experiment_alias", "experiment_accession"]
].drop_duplicates()
[docs]
def gse_to_srp(self, gse, **kwargs):
if isinstance(gse, str):
gse = [gse]
if not gse: # Handle empty input
return pd.DataFrame(columns=["study_alias", "study_accession"])
gse_df = self.fetch_gds_results(gse, **kwargs)
if gse_df is None or gse_df.empty: # Handle case where no results found
return pd.DataFrame(columns=["study_alias", "study_accession"])
gse_df = gse_df.rename(
columns={"accession": "study_alias", "SRA": "study_accession"}
)
gse_df_subset = None
if "GSE" in gse_df.entrytype.unique():
gse_df_subset = gse_df[gse_df.entrytype == "GSE"]
common_gses = set(gse_df.study_alias.unique()).intersection(gse)
if len(common_gses) < len(gse):
gse_df_subset = None
# Check if GSE entries have valid SRA accessions
# If any study_accessions are NaN, fall back to extracting from GSM entries
elif gse_df_subset["study_accession"].isna().any():
gse_df_subset = None
if gse_df_subset is None:
# sometimes SRX ids are returned instead of an entire project
# see https://github.com/saketkc/pysradb/issues/186
# GSE: GSE209835; SRP =SRP388275
gse_df_subset_gse = gse_df[gse_df.entrytype == "GSE"]
# Include GSEs that are missing OR have NaN study_accessions
gses_without_srp = gse_df_subset_gse[
gse_df_subset_gse.study_accession.isna()
].study_alias.tolist()
gse_of_interest = (
list(set(gse).difference(gse_df.study_alias.unique()))
+ gses_without_srp
)
gse_df_subset_other = gse_df[gse_df.entrytype != "GSE"]
# Filter out NaN values from study_accession before converting to SRP
srx = gse_df_subset_other.study_accession.dropna().tolist()
srp_df = self.srx_to_srp(srx)
srp_unique = list(
set(srp_df.study_accession.unique()).difference(
gse_df_subset_gse.study_accession.tolist()
)
)
# Handle mismatched lengths between GSEs and SRPs
# Create all combinations of GSE-SRP pairs
gse_srp_pairs = []
for gse_id in gse_of_interest:
for srp_id in srp_unique:
gse_srp_pairs.append(
{"study_alias": gse_id, "study_accession": srp_id}
)
if gse_srp_pairs:
new_gse_df = pd.DataFrame(gse_srp_pairs)
else:
# If no pairs, create empty DataFrame with correct columns
new_gse_df = pd.DataFrame(columns=["study_alias", "study_accession"])
# Filter out GSE entries with NaN study_accession before concatenating
# as we've already created new pairs for them
gse_df_subset_gse_valid = gse_df_subset_gse[
gse_df_subset_gse.study_accession.notna()
]
gse_df_subset = pd.concat([gse_df_subset_gse_valid, new_gse_df])
gse_df_subset = gse_df_subset.loc[gse_df_subset.study_alias.isin(gse)]
return gse_df_subset[["study_alias", "study_accession"]].drop_duplicates()
[docs]
def gsm_to_srp(self, gsm, **kwargs):
gsm_df = self.fetch_gds_results(gsm, **kwargs)
gsm_df = gsm_df[gsm_df.entrytype == "GSE"]
gsm_df = gsm_df.rename(
columns={"accession": "experiment_alias", "SRA": "study_accession"}
)
return gsm_df[["experiment_alias", "study_accession"]].drop_duplicates()
[docs]
def gsm_to_srr(self, gsm, **kwargs):
gsm_df = self.fetch_gds_results(gsm, **kwargs)
gsm_df = gsm_df.rename(
columns={
"accession": "experiment_alias",
"SRA": "experiment_accession",
"title": "experiment_title",
"summary": "sample_attribute",
}
)
gsm_df = gsm_df[gsm_df.entrytype == "GSM"]
srr_df = self.srx_to_srr(gsm_df.experiment_accession.tolist())
gsm_df = gsm_df.merge(srr_df, on="experiment_accession")
return gsm_df[["experiment_alias", "run_accession"]]
[docs]
def gsm_to_srs(self, gsm, **kwargs):
"""Get SRS for a GSM"""
gsm_df = self.fetch_gds_results(gsm, **kwargs)
gsm_df = gsm_df[gsm_df.entrytype == "GSM"].rename(
columns={"SRA": "experiment_accession", "accession": "experiment_alias"}
)
srx = gsm_df.experiment_accession.tolist()
time.sleep(self.sleep_time)
srs_df = self.srx_to_srs(srx)
gsm_df = srs_df.merge(gsm_df, on="experiment_accession")[
["experiment_alias", "sample_accession"]
]
return gsm_df.drop_duplicates()
[docs]
def gsm_to_srx(self, gsm, **kwargs):
"""Get SRX for a GSM"""
if isinstance(gsm, str):
gsm = [gsm]
gsm_df = self.fetch_gds_results(gsm, **kwargs)
gsm_df = gsm_df[gsm_df.entrytype == "GSM"].rename(
columns={"SRA": "experiment_accession", "accession": "experiment_alias"}
)
gsm_df = gsm_df.loc[gsm_df["experiment_alias"].isin(gsm)]
return gsm_df[["experiment_alias", "experiment_accession"]].drop_duplicates()
[docs]
def gsm_to_gse(self, gsm, **kwargs):
if isinstance(gsm, str):
gsm = [gsm]
if not gsm: # Handle empty input
return pd.DataFrame(columns=["study_alias", "study_accession"])
gsm_df = self.fetch_gds_results(gsm, **kwargs)
if gsm_df is None or gsm_df.empty: # Handle case where no results found
return pd.DataFrame(columns=["study_alias", "study_accession"])
# For GSM queries, we need to extract GSE IDs from the 'gse' column
# The entrytype will be 'GSM', not 'GSE'
gsm_entries = gsm_df[gsm_df.entrytype == "GSM"]
if gsm_entries.empty:
return pd.DataFrame(columns=["study_alias", "study_accession"])
# Extract GSE IDs from the 'gse' column and create result rows
results = []
for _, row in gsm_entries.iterrows():
gsm_id = row["accession"]
gse_str = str(row.get("gse", ""))
if gse_str and gse_str != "nan":
# Handle multiple GSE IDs separated by semicolon
gse_ids = [
gse_id.strip() for gse_id in gse_str.split(";") if gse_id.strip()
]
for gse_id in gse_ids:
if gse_id.isdigit():
# Add GSE prefix if it's just a number
gse_id = f"GSE{gse_id}"
results.append(
{
"study_alias": gse_id,
"study_accession": row.get("SRA", pd.NA),
}
)
if results:
result_df = pd.DataFrame(results)
return result_df[["study_alias", "study_accession"]].drop_duplicates()
else:
return pd.DataFrame(columns=["study_alias", "study_accession"])
[docs]
def srp_to_gse(self, srp, **kwargs):
"""Get GSE for a SRP"""
srp_df = self.fetch_gds_results(srp, **kwargs)
if srp_df is None:
srp_df = pd.DataFrame(
{"study_alias": [], "study_accession": [], "entrytype": []}
)
srp_df = srp_df.rename(
columns={"accession": "study_alias", "SRA": "study_accession"}
)
srp_df_gse = srp_df[srp_df.entrytype == "GSE"]
missing_srp = list(set(srp).difference(srp_df_gse.study_accession.tolist()))
srp_df_nongse = srp_df[srp_df.entrytype != "GSE"]
if srp_df_nongse.shape[0] >= 1:
srp_df_nongse = pd.DataFrame(
{
"study_accession": missing_srp,
"study_alias": [pd.NA] * len(missing_srp),
"entrytpe": ["GSE"] * len(missing_srp),
}
)
srp_df = pd.concat([srp_df_gse, srp_df_nongse])
return srp_df[["study_accession", "study_alias"]].drop_duplicates()
[docs]
def srp_to_srr(self, srp, **kwargs):
"""Get SRR for a SRP"""
srp_df = self.sra_metadata(srp, **kwargs)
return _order_first(srp_df, ["study_accession", "run_accession"])
[docs]
def srp_to_srs(self, srp, **kwargs):
"""Get SRS for a SRP"""
srp_df = self.sra_metadata(srp, **kwargs)
return _order_first(srp_df, ["study_accession", "sample_accession"])
[docs]
def srp_to_srx(self, srp, **kwargs):
"""Get SRX for a SRP"""
srp_df = self.sra_metadata(srp, **kwargs)
srp_df["study_accesssion"] = srp
return _order_first(srp_df, ["study_accession", "experiment_accession"])
[docs]
def srr_to_gsm(self, srr, **kwargs):
"""Get GSM for a SRR"""
if isinstance(srr, str):
srr = [srr]
srr_df = self.srr_to_srp(srr, detailed=True)
# remove NAs
srp = [x for x in srr_df.study_accession.tolist() if not x is pd.NA]
gse_df = self.fetch_gds_results(srp, **kwargs)
gse_df = gse_df[gse_df.entrytype == "GSE"].rename(
columns={"SRA": "project_accession", "accession": "project_alias"}
)
gsm_df = self.gse_to_gsm(gse_df.project_alias.tolist(), detailed=True)
srr_cols = list(
set(srr_df.columns.tolist()).difference(gsm_df.columns.tolist())
) + ["experiment_accession"]
joined_df = gsm_df.merge(srr_df[srr_cols], on="experiment_accession")
df = _order_first(joined_df, ["run_accession", "experiment_alias"])
df = df.loc[df["run_accession"].isin(srr)]
return df
[docs]
def srr_to_srp(self, srr, **kwargs):
"""Get SRP for a SRR"""
if isinstance(srr, str):
srr = [srr]
srr_df = self.sra_metadata(srr, **kwargs)
if kwargs and kwargs["detailed"] == True:
return srr_df
srr_df = srr_df.loc[srr_df["run_accession"].isin(srr)]
return _order_first(srr_df, ["run_accession", "study_accession"])
[docs]
def srr_to_srs(self, srr, **kwargs):
"""Get SRS for a SRR"""
if isinstance(srr, str):
srr = [srr]
srr_df = self.sra_metadata(srr, **kwargs)
srr_df = srr_df.loc[srr_df["run_accession"].isin(srr)]
return _order_first(srr_df, ["run_accession", "sample_accession"])
[docs]
def srr_to_srx(self, srr, **kwargs):
"""Get SRX for a SRR"""
if isinstance(srr, str):
srr = [srr]
srr_df = self.sra_metadata(srr)
srr_df = srr_df.loc[srr_df["run_accession"].isin(srr)]
return _order_first(srr_df, ["run_accession", "experiment_accession"])
[docs]
def srs_to_gsm(self, srs, **kwargs):
"""Get GSM for a SRS"""
if isinstance(srs, str):
srs = [srs]
srx_df = self.srs_to_srx(srs)
time.sleep(self.sleep_time)
gsm_df = self.srx_to_gsm(srx_df.experiment_accession.tolist(), **kwargs)
srs_df = srx_df.merge(gsm_df, on="experiment_accession")
srs_df = srs_df.loc[srs_df["sample_accession"].isin(srs)]
return _order_first(srs_df, ["sample_accession", "experiment_alias"])
[docs]
def srx_to_gsm(self, srx, **kwargs):
if isinstance(srx, str):
srx = [srx]
gsm_df = self.fetch_gds_results(srx, **kwargs)
gsm_df = gsm_df[gsm_df.entrytype == "GSM"].rename(
columns={"SRA": "experiment_accession", "accession": "experiment_alias"}
)
gsm_df = gsm_df.loc[gsm_df["experiment_accession"].isin(srx)]
return gsm_df[["experiment_accession", "experiment_alias"]].drop_duplicates()
[docs]
def srs_to_srx(self, srs, **kwargs):
"""Get SRX for a SRS"""
srs_df = self.sra_metadata(srs, **kwargs)
return _order_first(srs_df, ["sample_accession", "experiment_accession"])
[docs]
def srx_to_srp(self, srx, **kwargs):
"""Get SRP for a SRX"""
srx_df = self.sra_metadata(srx, **kwargs)
return _order_first(srx_df, ["experiment_accession", "study_accession"])
[docs]
def srx_to_srr(self, srx, **kwargs):
"""Get SRR for a SRX"""
srx_df = self.sra_metadata(srx, **kwargs)
return _order_first(srx_df, ["experiment_accession", "run_accession"])
[docs]
def srx_to_srs(self, srx, **kwargs):
"""Get SRS for a SRX"""
srx_df = self.sra_metadata(srx, **kwargs)
return _order_first(srx_df, ["experiment_accession", "sample_accession"])
[docs]
def search(self, *args, **kwargs):
raise NotImplementedError("Search not yet implemented for Web")
[docs]
def fetch_bioproject_pmids(self, bioprojects):
"""Fetch PMIDs for given BioProject accessions
Parameters
----------
bioprojects: list or str
BioProject accession(s)
Returns
-------
bioproject_pmids: dict
Mapping of BioProject to list of PMIDs
"""
if isinstance(bioprojects, str):
bioprojects = [bioprojects]
bioproject_pmids = {}
for bioproject in bioprojects:
if pd.isna(bioproject) or not bioproject:
bioproject_pmids[bioproject] = []
continue
try:
payload = self.efetch_params.copy()
payload = [param for param in payload if param[0] != "retmode"]
payload += [
("db", "bioproject"),
("id", bioproject),
("retmode", "xml"),
]
request = requests.get(
self.base_url["efetch"], params=OrderedDict(payload)
)
xml_text = request.text.strip()
# Parse XML to extract Publication IDs
pmids = []
try:
xml_dict = xmltodict.parse(
xml_text, process_namespaces=False, dict_constructor=OrderedDict
)
# Navigate through the XML structure
if "RecordSet" in xml_dict:
records = xml_dict["RecordSet"].get("DocumentSummary", [])
if not isinstance(records, list):
records = [records]
for record in records:
project_descr = record.get("Project", {}).get(
"ProjectDescr", {}
)
publications = project_descr.get("Publication", [])
if not isinstance(publications, list):
publications = [publications]
for pub in publications:
pub_id = pub.get("@id", "")
if pub_id and pub_id.isdigit():
pmids.append(pub_id)
except ExpatError:
# XML parsing failed --> Look for PMID patterns in the raw text
pmid_pattern = r'id="(\d+)"'
matches = re.findall(pmid_pattern, xml_text)
pmids = [
match for match in matches if len(match) >= 7
] # PMIDs are typically 7+ digits
# If no PMIDs found in bioproject XML, try searching PMC by bioproject ID
if not pmids:
pmids = self._search_pmc_by_bioproject(bioproject)
bioproject_pmids[bioproject] = list(set(pmids)) # Remove duplicates
time.sleep(self.sleep_time)
except Exception as e:
warnings.warn(f"Failed to fetch PMIDs for BioProject {bioproject}: {e}")
bioproject_pmids[bioproject] = []
return bioproject_pmids
[docs]
def srp_to_pmid(self, srp_accessions):
"""Get PMIDs associated with SRP accessions
Parameters
----------
srp_accessions: list or str
SRP accession(s)
Returns
-------
srp_pmid_df: pandas.DataFrame
DataFrame with SRP accessions and associated PMIDs
"""
if isinstance(srp_accessions, str):
srp_accessions = [srp_accessions]
# Get metadata to extract BioProject information
metadata_df = self.sra_metadata(srp_accessions)
if metadata_df is None or metadata_df.empty:
return pd.DataFrame(columns=["srp_accession", "bioproject", "pmid"])
# Try to get PMIDs via BioProject first
unique_bioprojects = metadata_df["bioproject"].dropna().unique().tolist()
bioproject_pmids = self.fetch_bioproject_pmids(unique_bioprojects)
# If no BioProject PMIDs found, try fallback search
external_pmids = []
if not any(pmids for pmids in bioproject_pmids.values()):
external_pmids = self._search_fallback_pmids(srp_accessions)
# Build results - one row per unique SRP accession
results = []
for _, row in metadata_df.iterrows():
srp_acc = self._extract_sra_accession(row)
bioproject = row.get("bioproject", "")
# Get PMIDs (BioProject takes priority over external)
pmids = bioproject_pmids.get(bioproject, [])
if not pmids and external_pmids:
pmids = external_pmids
# Add result with smallest PMID (if any found)
smallest_pmid = self._get_smallest_pmid(pmids) if pmids else pd.NA
results.append(
{
"srp_accession": srp_acc,
"bioproject": bioproject,
"pmid": smallest_pmid,
}
)
return pd.DataFrame(results).drop_duplicates()
def _search_fallback_pmids(self, srp_accessions):
"""Search for PMIDs using fallback strategies (external sources + direct SRA search + GSE search)"""
try:
original_sleep = self.sleep_time
self.sleep_time = max(0.1, self.sleep_time * 0.5)
# Strategy 1: Search via external source identifiers
# Example: ERP018009
detailed_metadata = self.sra_metadata(
srp_accessions, detailed=True, include_pmids=False
)
if detailed_metadata is not None and not detailed_metadata.empty:
if external_sources := self.extract_external_sources(detailed_metadata):
pmids = self.search_pmc_for_external_sources([external_sources[0]])
if pmids:
return pmids
# Strategy 2: Search via GSE identifiers extracted from metadata
# Example: GSE253406 --> SRP484103
gse_pmids = self._search_gse_gsm_pmids(
detailed_metadata, srp_accessions
)
if gse_pmids:
return gse_pmids
# Strategy 3: Direct SRP ID search
# Example: SRP047086
pmids = self.search_pmc_for_external_sources(srp_accessions)
return pmids
except Exception as e:
return []
finally:
self.sleep_time = original_sleep
def _extract_sra_accession(self, row):
"""Extract SRA accession from metadata row"""
return row.get(
"study_accession",
row.get(
"run_accession",
row.get("experiment_accession", row.get("sample_accession", "")),
),
)
def _get_smallest_pmid(self, pmids):
"""Get the numerically smallest PMID from a list"""
if not pmids:
return pd.NA
# Convert to integers for proper numeric sorting
pmid_ints = []
for pmid in pmids:
try:
pmid_ints.append(int(pmid))
except ValueError:
pmid_ints.append(pmid) # Keep non-numeric as-is
return str(min(pmid_ints))
[docs]
def extract_external_sources(self, metadata_df):
"""Extract external source identifiers from SRA metadata
Parameters
----------
metadata_df: pandas.DataFrame
DataFrame containing SRA metadata
Returns
-------
external_sources: list
List of external source identifiers found
"""
external_sources = []
patterns = [
r"E-MTAB-\d+", # ArrayExpress
r"GSE\d+", # GEO Series
r"E-GEOD-\d+", # GEO in ArrayExpress
r"E-MEXP-\d+", # MEXP in ArrayExpress
r"E-TABM-\d+", # TABM in ArrayExpress
]
# Fields that commonly contain external source identifiers
source_fields = ["run_alias", "submitter id", "sample name", "experiment_alias"]
for field in source_fields:
if field in metadata_df.columns:
values = metadata_df[field].dropna().unique()
for value in values:
value_str = str(value)
for pattern in patterns:
matches = re.findall(pattern, value_str)
external_sources.extend(
match for match in matches if match not in external_sources
)
return external_sources
def _search_gse_gsm_pmids(self, metadata_df, sra_accessions):
"""Search for PMIDs using GSE identifiers from BioProject and SRP conversion
Parameters
----------
metadata_df: pandas.DataFrame
Detailed metadata DataFrame
sra_accessions: list
List of SRA accessions being searched
Returns
-------
pmids: list
List of PMIDs found via GSE search
"""
import time
gse_identifiers = []
# Strategy 1: BioProject to GSE conversion via NCBI search
if "bioproject" in metadata_df.columns:
unique_bioprojects = metadata_df["bioproject"].dropna().unique()
for bioproject in unique_bioprojects[
:3
]: # Limit to avoid too many requests
try:
gse_ids = self._bioproject_to_gse(bioproject)
gse_identifiers.extend(gse_ids)
time.sleep(self.sleep_time) # Rate limiting
except Exception:
pass
# Strategy 2: SRP to GSE conversion via NCBI ELink
for sra_acc in sra_accessions:
if sra_acc.startswith("SRP"):
try:
gse_ids = self._srp_to_gse_via_elink(sra_acc)
gse_identifiers.extend(gse_ids)
time.sleep(self.sleep_time) # Rate limiting
except Exception:
pass
# Strategy 3: Try existing pysradb SRP to GSE conversion
for sra_acc in sra_accessions:
if sra_acc.startswith("SRP"):
try:
gse_df = self.srp_to_gse(sra_acc)
if not gse_df.empty and "experiment_alias" in gse_df.columns:
gse_values = gse_df["experiment_alias"].dropna().astype(str)
for gse_val in gse_values:
if gse_val.startswith("GSE"):
gse_identifiers.append(gse_val)
except Exception:
pass
# Remove duplicates and search PMC for GSE identifiers
unique_gse_ids = list(set(gse_identifiers))
if unique_gse_ids:
pmids = self.search_pmc_for_external_sources(unique_gse_ids)
return pmids
return []
def _bioproject_to_gse(self, bioproject):
"""Convert BioProject ID to GSE ID via NCBI search
Parameters
----------
bioproject: str
BioProject ID (e.g., 'PRJNA1065472')
Returns
-------
gse_ids: list
List of GSE IDs found
"""
import requests
gse_ids = []
try:
search_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi"
search_params = {
"db": "gds",
"term": f"{bioproject}[BioProject]",
"retmode": "json",
"retmax": "10",
}
response = requests.get(search_url, params=search_params, timeout=30)
response.raise_for_status()
result = response.json()
geo_uids = result["esearchresult"]["idlist"]
if geo_uids:
# Get summary to find GSE IDs
summary_url = (
"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
)
summary_params = {
"db": "gds",
"id": ",".join(geo_uids),
"retmode": "json",
}
summary_response = requests.get(
summary_url, params=summary_params, timeout=30
)
summary_response.raise_for_status()
summary_result = summary_response.json()
for uid in geo_uids:
if uid in summary_result["result"]:
record = summary_result["result"][uid]
accession = record.get("accession", "")
if accession.startswith("GSE"):
gse_ids.append(accession)
except Exception:
pass
return gse_ids
def _srp_to_gse_via_elink(self, srp_id):
"""Convert SRP ID to GSE ID via NCBI ELink
Parameters
----------
srp_id: str
SRP ID (e.g., 'SRP484103')
Returns
-------
gse_ids: list
List of GSE IDs found
"""
import requests
gse_ids = []
try:
# First, search for the SRP in SRA database to get UIDs
search_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi"
search_params = {
"db": "sra",
"term": srp_id,
"retmode": "json",
"retmax": "5",
}
response = requests.get(search_url, params=search_params, timeout=30)
response.raise_for_status()
result = response.json()
sra_uids = result["esearchresult"]["idlist"]
if sra_uids:
# Use ELink to find related GEO records
elink_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/elink.fcgi"
elink_params = {
"dbfrom": "sra",
"db": "gds",
"id": sra_uids[0], # Use first UID
"retmode": "json",
}
elink_response = requests.get(
elink_url, params=elink_params, timeout=30
)
elink_response.raise_for_status()
elink_result = elink_response.json()
if "linksets" in elink_result:
for linkset in elink_result["linksets"]:
if "linksetdbs" in linkset:
for linksetdb in linkset["linksetdbs"]:
if linksetdb["dbto"] == "gds":
geo_uids = linksetdb["links"]
if geo_uids:
# Get summary to find GSE IDs
summary_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
summary_params = {
"db": "gds",
"id": ",".join(geo_uids),
"retmode": "json",
}
summary_response = requests.get(
summary_url,
params=summary_params,
timeout=30,
)
summary_response.raise_for_status()
summary_result = summary_response.json()
for uid in geo_uids:
if uid in summary_result["result"]:
record = summary_result["result"][uid]
accession = record.get("accession", "")
if accession.startswith("GSE"):
gse_ids.append(accession)
except Exception:
pass
return gse_ids
def _search_pmc_by_bioproject(self, bioproject_id):
"""Search PubMed Central for PMIDs using BioProject accession ID
This provides a fallback mechanism when the BioProject XML doesn't contain
publication metadata but the research has been published and is cited in PMC.
Parameters
----------
bioproject_id: str
BioProject accession ID (e.g., PRJEB39301, PRJNA123456)
Returns
-------
pmids: list
List of PMIDs found associated with the bioproject
"""
try:
search_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi"
search_params = {
"db": "pmc",
"term": bioproject_id,
"retmode": "json",
"retmax": "10",
}
response = requests.get(search_url, params=search_params, timeout=60)
response.raise_for_status()
result = response.json()
pmc_ids = result.get("esearchresult", {}).get("idlist", [])
if not pmc_ids:
return []
# Get primary PMIDs for each PMC article
summary_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
summary_params = {
"db": "pmc",
"id": ",".join(pmc_ids),
"retmode": "json",
}
summary_response = requests.get(
summary_url, params=summary_params, timeout=60
)
summary_result = summary_response.json()
pmids = []
# Extract primary PMID for each PMC article
for pmc_id in pmc_ids:
if pmc_id in summary_result.get("result", {}):
article = summary_result["result"][pmc_id]
articleids = article.get("articleids", [])
# Find the primary PMID
for aid in articleids:
if aid.get("idtype") == "pmid":
primary_pmid = aid.get("value")
if primary_pmid and primary_pmid not in pmids:
pmids.append(primary_pmid)
break
return pmids
except Exception as e:
# Silently fail and return empty list for fallback mechanism
return []
[docs]
def search_pmc_for_external_sources(self, external_sources):
"""Search PubMed Central for PMIDs using external source identifiers
Parameters
----------
external_sources: list
List of external source identifiers
Returns
-------
pmids: list
List of PMIDs found
"""
if not external_sources:
return []
all_pmids = []
for source in external_sources:
try:
search_url = (
"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi"
)
search_params = {
"db": "pmc",
"term": source,
"retmode": "json",
"retmax": "10",
}
response = requests.get(search_url, params=search_params, timeout=60)
response.raise_for_status()
result = response.json()
pmc_ids = result["esearchresult"]["idlist"]
if not pmc_ids:
continue
# Get primary PMIDs for each PMC article
summary_url = (
"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
)
summary_params = {
"db": "pmc",
"id": ",".join(pmc_ids),
"retmode": "json",
}
summary_response = requests.get(
summary_url, params=summary_params, timeout=60
)
summary_result = summary_response.json()
# Extract primary PMID for each PMC article
for pmc_id in pmc_ids:
if pmc_id in summary_result["result"]:
article = summary_result["result"][pmc_id]
articleids = article.get("articleids", [])
# Find the primary PMID
for aid in articleids:
if aid.get("idtype") == "pmid":
primary_pmid = aid.get("value")
if primary_pmid and primary_pmid not in all_pmids:
all_pmids.append(primary_pmid)
break
time.sleep(self.sleep_time) # Rate limiting
except Exception as e:
continue
return list(set(all_pmids)) # Remove duplicates
[docs]
def sra_to_pmid(self, sra_accessions):
"""Get PMIDs for SRA accessions (backward compatibility wrapper)
Parameters
----------
sra_accessions: list or str
SRA accession(s) - can be SRP, SRR, SRX, or SRS
Returns
-------
sra_pmid_df: pandas.DataFrame
DataFrame with SRA accessions and associated PMIDs
"""
# For SRP accessions, use the main method
if isinstance(sra_accessions, str):
if sra_accessions.startswith("SRP"):
return self.srp_to_pmid(sra_accessions)
elif isinstance(sra_accessions, list):
# If all are SRP accessions, use main method
if all(acc.startswith("SRP") for acc in sra_accessions):
return self.srp_to_pmid(sra_accessions)
# For other SRA accessions, convert to SRP first if possible
# This is a simplified implementation for backward compatibility
return self.srp_to_pmid(sra_accessions)
[docs]
def srr_to_pmid(self, srr):
"""Get PMIDs for Run Accessions (SRR)"""
return self.sra_to_pmid(srr)
[docs]
def srx_to_pmid(self, srx):
"""Get PMIDs for Experiment Accessions (SRX)"""
return self.sra_to_pmid(srx)
[docs]
def srs_to_pmid(self, srs):
"""Get PMIDs for Sample Accessions (SRS)"""
return self.sra_to_pmid(srs)
[docs]
def gse_to_pmid(self, gse_accessions):
"""Get PMIDs for GSE accessions by searching PubMed Central
Parameters
----------
gse_accessions: list or str
GSE accession(s)
Returns
-------
gse_pmid_df: pandas.DataFrame
DataFrame with GSE accessions and associated PMIDs
"""
if isinstance(gse_accessions, str):
gse_accessions = [gse_accessions]
results = []
for gse_acc in gse_accessions:
pmids = self.search_pmc_for_external_sources([gse_acc])
smallest_pmid = self._get_smallest_pmid(pmids) if pmids else pd.NA
results.append(
{
"gse_accession": gse_acc,
"pmid": smallest_pmid,
}
)
return pd.DataFrame(results)
[docs]
def doi_to_pmid(self, dois):
"""Convert DOI(s) to PMID(s)
Parameters
----------
dois: list or str
DOI(s)
Returns
-------
doi_pmid_mapping: dict
Mapping of DOI to PMID
"""
if isinstance(dois, str):
dois = [dois]
doi_pmid_mapping = {}
for doi in dois:
try:
search_url = self.base_url["esearch"]
search_params = {
"db": "pubmed",
"term": f"{doi}[DOI]",
"retmode": "json",
}
response = requests.get(search_url, params=search_params, timeout=60)
response.raise_for_status()
result = response.json()
id_list = result.get("esearchresult", {}).get("idlist", [])
if id_list:
doi_pmid_mapping[doi] = id_list[0]
else:
doi_pmid_mapping[doi] = None
time.sleep(self.sleep_time)
except requests.RequestException as e:
warnings.warn(f"Network error while getting PMID for DOI {doi}: {e}")
doi_pmid_mapping[doi] = None
except ValueError as e:
warnings.warn(
f"Value error while processing response for DOI {doi}: {e}"
)
doi_pmid_mapping[doi] = None
return doi_pmid_mapping
[docs]
def pmid_to_pmc(self, pmids):
"""Convert PMID(s) to PMC ID(s)
Parameters
----------
pmids: list or str
PMID(s)
Returns
-------
pmid_pmc_mapping: dict
Mapping of PMID to PMC ID
"""
if isinstance(pmids, str):
pmids = [pmids]
pmid_pmc_mapping = {}
for pmid in pmids:
try:
summary_url = self.base_url["esummary"]
summary_params = {
"db": "pubmed",
"id": pmid,
"retmode": "json",
}
response = requests.get(summary_url, params=summary_params, timeout=60)
response.raise_for_status()
result = response.json()
# Extract PMC ID from articleids
if str(pmid) in result.get("result", {}):
article = result["result"][str(pmid)]
articleids = article.get("articleids", [])
for aid in articleids:
if aid.get("idtype") == "pmc":
pmc_id = aid.get("value")
pmid_pmc_mapping[pmid] = pmc_id
break
time.sleep(self.sleep_time)
except Exception as e:
warnings.warn(f"Failed to get PMC ID for PMID {pmid}: {e}")
pmid_pmc_mapping[pmid] = None
return pmid_pmc_mapping
[docs]
def fetch_pmc_fulltext(self, pmc_id):
"""Fetch full text from PMC article
Parameters
----------
pmc_id: str
PMC ID (can be with or without 'PMC' prefix)
Returns
-------
fulltext: str
Full text of the article, or None if unavailable
"""
# Ensure PMC ID has the PMC prefix
if not pmc_id.startswith("PMC"):
pmc_id = f"PMC{pmc_id}"
try:
fetch_url = self.base_url["efetch"]
fetch_params = {"db": "pmc", "id": pmc_id, "retmode": "xml"}
response = requests.get(fetch_url, params=fetch_params, timeout=60)
response.raise_for_status()
time.sleep(self.sleep_time)
return response.text
except Exception as e:
warnings.warn(f"Failed to fetch full text for {pmc_id}: {e}")
return None
[docs]
def extract_identifiers_from_text(self, text):
"""Extract GSE, PRJNA, SRP, and other identifiers from text
Parameters
----------
text: str
Text to search for identifiers
Returns
-------
identifiers: dict
Dictionary with lists of found identifiers by type
"""
if not text:
return {
"gse": [],
"prjna": [],
"srp": [],
"srr": [],
"srx": [],
"srs": [],
}
# Define patterns for different identifier types
patterns = {
"gse": r"GSE\d+",
"prjna": r"PRJNA\d+",
"srp": r"SRP\d+",
"srr": r"SRR\d+",
"srx": r"SRX\d+",
"srs": r"SRS\d+",
}
identifiers = {}
for id_type, pattern in patterns.items():
matches = re.findall(pattern, text, re.IGNORECASE)
# Convert to uppercase and remove duplicates
identifiers[id_type] = sorted(list(set([m.upper() for m in matches])))
return identifiers
[docs]
def pmc_to_identifiers(self, pmc_ids, convert_missing=True):
"""Extract database identifiers from PMC articles
Parameters
----------
pmc_ids: list or str
PMC ID(s) (can be with or without 'PMC' prefix)
convert_missing: bool
If True, automatically convert GSEāSRP when one is found but not the other
Default: True
Returns
-------
results_df: pandas.DataFrame
DataFrame with PMC IDs and extracted identifiers
"""
if isinstance(pmc_ids, str):
pmc_ids = [pmc_ids]
results = []
for pmc_id in pmc_ids:
# Fetch full text
fulltext = self.fetch_pmc_fulltext(pmc_id)
if fulltext:
# Extract identifiers
identifiers = self.extract_identifiers_from_text(fulltext)
if convert_missing:
# If we found GSE IDs but no SRP IDs, convert GSE to SRP
if identifiers["gse"] and not identifiers["srp"]:
try:
for gse_id in identifiers["gse"]:
gse_srp_df = self.gse_to_srp(gse_id)
if (
not gse_srp_df.empty
and "study_accession" in gse_srp_df.columns
):
srp_values = (
gse_srp_df["study_accession"].dropna().tolist()
)
identifiers["srp"].extend(
[str(x) for x in srp_values if not pd.isna(x)]
)
identifiers["srp"] = sorted(list(set(identifiers["srp"])))
time.sleep(self.sleep_time)
except Exception:
pass
# If we found SRP IDs but no GSE IDs, convert SRP to GSE
elif identifiers["srp"] and not identifiers["gse"]:
try:
for srp_id in identifiers["srp"]:
srp_gse_df = self.srp_to_gse(srp_id)
if (
not srp_gse_df.empty
and "study_alias" in srp_gse_df.columns
):
gse_values = (
srp_gse_df["study_alias"].dropna().tolist()
)
identifiers["gse"].extend(
[str(x) for x in gse_values if not pd.isna(x)]
)
identifiers["gse"] = sorted(
list(set(identifiers["gse"]))
) # Remove duplicates
time.sleep(self.sleep_time)
except Exception:
pass # If conversion fails, just keep what we found
# Extract PRJNA from SRP metadata if we have SRP IDs
if identifiers["srp"] and not identifiers["prjna"]:
try:
for srp_id in identifiers["srp"]:
srp_metadata = self.sra_metadata(srp_id)
if srp_metadata is not None and not srp_metadata.empty:
if "bioproject" in srp_metadata.columns:
bioproject_values = (
srp_metadata["bioproject"]
.dropna()
.unique()
.tolist()
)
identifiers["prjna"].extend(
[
str(x)
for x in bioproject_values
if not pd.isna(x)
]
)
identifiers["prjna"] = sorted(list(set(identifiers["prjna"])))
time.sleep(self.sleep_time)
except Exception:
pass
results.append(
{
"pmc_id": (
pmc_id if pmc_id.startswith("PMC") else f"PMC{pmc_id}"
),
"gse_ids": (
",".join(identifiers["gse"])
if identifiers["gse"]
else pd.NA
),
"prjna_ids": (
",".join(identifiers["prjna"])
if identifiers["prjna"]
else pd.NA
),
"srp_ids": (
",".join(identifiers["srp"])
if identifiers["srp"]
else pd.NA
),
"srr_ids": (
",".join(identifiers["srr"])
if identifiers["srr"]
else pd.NA
),
"srx_ids": (
",".join(identifiers["srx"])
if identifiers["srx"]
else pd.NA
),
"srs_ids": (
",".join(identifiers["srs"])
if identifiers["srs"]
else pd.NA
),
}
)
else:
results.append(
{
"pmc_id": (
pmc_id if pmc_id.startswith("PMC") else f"PMC{pmc_id}"
),
"gse_ids": pd.NA,
"prjna_ids": pd.NA,
"srp_ids": pd.NA,
"srr_ids": pd.NA,
"srx_ids": pd.NA,
"srs_ids": pd.NA,
}
)
return pd.DataFrame(results)
[docs]
def pmid_to_identifiers(self, pmids):
"""Extract database identifiers from PubMed articles via PMC
Parameters
----------
pmids: list or str
PMID(s)
Returns
-------
results_df: pandas.DataFrame
DataFrame with PMIDs, PMC IDs, and extracted identifiers
"""
if isinstance(pmids, str):
pmids = [pmids]
# First convert PMIDs to PMC IDs
pmid_pmc_mapping = self.pmid_to_pmc(pmids)
results = []
for pmid, pmc_id in pmid_pmc_mapping.items():
if pmc_id:
# Get identifiers from PMC
pmc_results = self.pmc_to_identifiers([pmc_id])
if not pmc_results.empty:
result = pmc_results.iloc[0].to_dict()
result["pmid"] = pmid
# Reorder columns to have pmid first
result = {
"pmid": result["pmid"],
"pmc_id": result["pmc_id"],
"gse_ids": result["gse_ids"],
"prjna_ids": result["prjna_ids"],
"srp_ids": result["srp_ids"],
"srr_ids": result["srr_ids"],
"srx_ids": result["srx_ids"],
"srs_ids": result["srs_ids"],
}
results.append(result)
else:
results.append(
{
"pmid": pmid,
"pmc_id": pmc_id,
"gse_ids": pd.NA,
"prjna_ids": pd.NA,
"srp_ids": pd.NA,
"srr_ids": pd.NA,
"srx_ids": pd.NA,
"srs_ids": pd.NA,
}
)
else:
# No PMC ID available
results.append(
{
"pmid": pmid,
"pmc_id": pd.NA,
"gse_ids": pd.NA,
"prjna_ids": pd.NA,
"srp_ids": pd.NA,
"srr_ids": pd.NA,
"srx_ids": pd.NA,
"srs_ids": pd.NA,
}
)
return pd.DataFrame(results)
[docs]
def pmid_to_gse(self, pmids):
"""Get GSE identifiers from PMID(s)
Parameters
----------
pmids: list or str
PMID(s)
Returns
-------
results_df: pandas.DataFrame
DataFrame with PMIDs and GSE identifiers
"""
full_results = self.pmid_to_identifiers(pmids)
return full_results[["pmid", "pmc_id", "gse_ids"]]
[docs]
def pmid_to_srp(self, pmids):
"""Get SRP identifiers from PMID(s)
Parameters
----------
pmids: list or str
PMID(s)
Returns
-------
results_df: pandas.DataFrame
DataFrame with PMIDs and SRP identifiers
"""
full_results = self.pmid_to_identifiers(pmids)
return full_results[["pmid", "pmc_id", "srp_ids"]]
[docs]
def doi_to_identifiers(self, dois):
"""Extract database identifiers from articles via DOI
Parameters
----------
dois: list or str
DOI(s)
Returns
-------
results_df: pandas.DataFrame
DataFrame with DOIs, PMIDs, PMC IDs, and extracted identifiers
"""
if isinstance(dois, str):
dois = [dois]
doi_pmid_mapping = self.doi_to_pmid(dois)
results = []
for doi, pmid in doi_pmid_mapping.items():
if pmid:
pmid_results = self.pmid_to_identifiers([pmid])
if not pmid_results.empty:
result = pmid_results.iloc[0].to_dict()
result["doi"] = doi
result = {
"doi": result["doi"],
"pmid": result["pmid"],
"pmc_id": result["pmc_id"],
"gse_ids": result["gse_ids"],
"prjna_ids": result["prjna_ids"],
"srp_ids": result["srp_ids"],
"srr_ids": result["srr_ids"],
"srx_ids": result["srx_ids"],
"srs_ids": result["srs_ids"],
}
results.append(result)
else:
results.append(
{
"doi": doi,
"pmid": pmid,
"pmc_id": pd.NA,
"gse_ids": pd.NA,
"prjna_ids": pd.NA,
"srp_ids": pd.NA,
"srr_ids": pd.NA,
"srx_ids": pd.NA,
"srs_ids": pd.NA,
}
)
else:
# No PMID available
results.append(
{
"doi": doi,
"pmid": pd.NA,
"pmc_id": pd.NA,
"gse_ids": pd.NA,
"prjna_ids": pd.NA,
"srp_ids": pd.NA,
"srr_ids": pd.NA,
"srx_ids": pd.NA,
"srs_ids": pd.NA,
}
)
return pd.DataFrame(results)
[docs]
def doi_to_gse(self, dois):
"""Get GSE identifiers from DOI(s)
Parameters
----------
dois: list or str
DOI(s)
Returns
-------
results_df: pandas.DataFrame
DataFrame with DOIs and GSE identifiers
"""
full_results = self.doi_to_identifiers(dois)
return full_results[["doi", "pmid", "pmc_id", "gse_ids"]]
[docs]
def doi_to_srp(self, dois):
"""Get SRP identifiers from DOI(s)
Parameters
----------
dois: list or str
DOI(s)
Returns
-------
results_df: pandas.DataFrame
DataFrame with DOIs and SRP identifiers
"""
full_results = self.doi_to_identifiers(dois)
return full_results[["doi", "pmid", "pmc_id", "srp_ids"]]