imported uniprot preprocessing scripts

This commit is contained in:
Sandro La Bruzzo 2023-09-21 15:08:48 +02:00
parent a9712c63d1
commit 7decdade98
6 changed files with 121 additions and 10 deletions

3
.gitignore vendored
View File

@ -12,6 +12,9 @@ __pycache__/
build/
download
metadata
uniprot_metadata
uniprot_sprot.dat.gz
dump*.gz
develop-eggs/
dist/
downloads/

27
main.py
View File

@ -1,14 +1,23 @@
from pdb.pdb_download import PDBDownloader
from pdb.pdb_metadata_extractor import MetadataExctractor
from pdb.pdb_validator import validate
from uniprot.download import UniprotSwissDownloader
from uniprot.metadata import MetadataExctractor as ME
if __name__ == '__main__':
p = PDBDownloader()
p.get_file_to_downloads(max_item=4)
m = MetadataExctractor()
m.extract_metadata()
error, valid, total, error_record = validate()
print(error)
print(f"Valid {valid}/{total}")
print(f"Error {error_record}/{total}")
u = UniprotSwissDownloader()
#u.download()
k = ME()
k.extract_metadata()
# p = PDBDownloader()
# p.get_file_to_downloads(max_item=4)
# m = MetadataExctractor()
# m.extract_metadata()
# error, valid, total, error_record = validate()
# print(error)
# print(f"Valid {valid}/{total}")
# print(f"Error {error_record}/{total}")

View File

@ -19,7 +19,7 @@ class MetadataExctractor:
p['classification'] = l[10:49].strip().capitalize()
p['pdb'] = l[62:].strip()
p['deposition_date'] = d =datetime.datetime.strptime(l[50:59],"%d-%b-%y" ).strftime("%Y-%m-%d")
p['deposition_date'] = datetime.datetime.strptime(l[50:59],"%d-%b-%y" ).strftime("%Y-%m-%d")
elif l.startswith("REMARK"):
break
elif l.startswith("TITLE"):

0
uniprot/__init__.py Normal file
View File

24
uniprot/download.py Normal file
View File

@ -0,0 +1,24 @@
import requests
import sys
class UniprotSwissDownloader():
def __init__(self, url = "https://ftp.ebi.ac.uk/pub/databases/uniprot/current_release/knowledgebase/complete/uniprot_sprot.dat.gz") -> None:
self.url = url
def download(self):
r= requests.get(self.url, stream=True)
total_length = r.headers.get('content-length')
with open("uniprot_sprot.dat.gz", 'wb') as f:
if total_length is None: # no content length header
f.write(r.content)
else:
dl = 0
total_length = int(total_length)
for data in r.iter_content(chunk_size=4096):
dl += len(data)
f.write(data)
done = int(50 * dl / total_length)
sys.stdout.write("\rDownloading[%s%s]" % ('=' * done, ' ' * (50-done)) )
sys.stdout.flush()

75
uniprot/metadata.py Normal file
View File

@ -0,0 +1,75 @@
import gzip
import json
import datetime
import os
import shutil
import re
regex = r"[OPQ][0-9][A-Z0-9]{3}[0-9]|[A-NR-Z][0-9]([A-Z][A-Z0-9]{2}[0-9]){1,2}"
class MetadataExctractor:
def __init__(self, path="uniprot_sprot.dat.gz") -> None:
print(f"Open Path {path}")
self.current_file = gzip.open(path)
def get_metadata(self) :
p = {}
for line in self.current_file:
l = line.decode("utf-8").strip()
if l.startswith("AC"):
p['pid'] =l[4:-1]
elif l.startswith("DT "):
cd ={}
k = l[4:-1].split(',')
cd['date'] =datetime.datetime.strptime(k[0].strip(),"%d-%b-%Y" ).strftime("%Y-%m-%d")
cd['date_info'] = k[1].strip()
dates = p.get("dates", [])
dates.append(cd)
p['dates']= dates
elif l.startswith("DE "):
k = l[4:-1]
if 'RecName: Full=' in k:
p['title'] = k[k.find('=')+1:]
elif l.startswith("OS "):
p['organism_species'] =l[4:-1].strip()
elif l.startswith("OC "):
s = l[4:-1].strip().split(';')
subjects = p.get('subjects', [])
for sub in s:
subjects.append(sub.strip())
p['subjects']= subjects
elif l.startswith("RX "):
references = [c.strip() for c in l[4:-1].strip().split(';')]
relations = []
if len(references):
for r in references:
dd = r.split("=")
if len(dd)==2 and dd[0].lower().strip() in ['pubmed', 'doi']:
relations.append({dd[0]:dd[1]})
if len(relations):
p["references"] = relations
elif l.startswith("//"):
yield json.dumps(p)
def extract_metadata(self, output_path="uniprot_metadata"):
if (os.path.exists(output_path) and os.path.isdir(output_path)):
shutil.rmtree(output_path)
os.mkdir(output_path)
part = 0
i = 0
w = gzip.open(f"{output_path}/dump_0{part}.gz", "w")
for item in self.get_metadata():
i +=1
w.write(item.encode())
w.write("\n".encode())
if i % 10000==0:
part +=1
w.flush()
w.close()
print(f"PARSED {i}")
w = gzip.open(f"dump_0{part}.gz", "w")