-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathclarity_ids.py
More file actions
executable file
·162 lines (125 loc) · 6.04 KB
/
clarity_ids.py
File metadata and controls
executable file
·162 lines (125 loc) · 6.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/env python3
"""
Author: John Arnn
Release Date: 10/12/2023
Usage:
This script is used for the collection of Unquie IDs found in Clarity for all samples UPHL NGS processes.
It uses the Clarity API to retrieve data so you must have a active user account and password.
You also need to provide one of three flags:
--all will download every samples unique ids. Tens of thousands of API calls so it takes at least 45 min to run.
--list 1234,1234,1234 Provide the uphl accession followed by a , with no spaces for results for specfic samples
--file provide a path to a txt file that has each sample seperated by a new line
The output of this script is a csv file into the directory the script was ran in named clarity_ids_output.csv.
OR use flag '--output' to provide full path and name of file
Users must have an account in Clarity that has the System Adminastrator Role.
"""
import argparse
import xml.etree.ElementTree as ET
import requests
from requests.auth import HTTPBasicAuth
import re
import pandas as pd
parser = argparse.ArgumentParser(description="A script that accepts user, password, and data source options.")
# Required arguments
parser.add_argument("--user", required=True, help="User name")
parser.add_argument("--password", required=True, help="User's password")
parser.add_argument("--output", help="Creates csv of all sampels")
# Either list or file must be provided
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--list", help="Specify a list of samples you need ids for. 123,124,125")
group.add_argument("--file", help="Specify a file that has a sample name for each line of file")
group.add_argument("--all", action="store_true", help="Creates csv of all sampels")
args = parser.parse_args()
def flatten(l):
return [item for sublist in l for item in sublist]
udfs=['ARLN ID','CDC ID','GCWGS ID','GISP ID','PNUSA Number']
# Check User Name and Password
xml = (requests.get("https://uphl-ngs.claritylims.com/api/v2/samples/%s" % 'WAG357A292',
auth=HTTPBasicAuth(args.user, args.password)).content).decode("utf-8")
check=(re.findall(r'<message>(.*?)</message>', xml))
if check == ['Unauthorized']:
sys.exit('User name or password is unauthorized. Please check username and password!')
if args.all:
sample_lims_ids = []
xml=(requests.get("https://uphl-ngs.claritylims.com/api/v2/samples",
auth=HTTPBasicAuth(args.user, args.password)).content).decode("utf-8")
sample_lims_ids.append(re.findall(r'limsid="([^"]+)"', xml))
index=500
while index > 0:
xml=(requests.get("https://uphl-ngs.claritylims.com/api/v2/samples?start-index=%s" % index,
auth=HTTPBasicAuth(args.user, args.password)).content).decode("utf-8")
index+=500
if len(xml) < 300:
index=0
sample_lims_ids.append(re.findall(r'limsid="([^"]+)"', xml))
sample_lims_ids=flatten(sample_lims_ids)
if args.list:
sample_lims_ids = args.list.split(',')
if args.file:
sample_lims_ids = []
with open(args.file, 'r') as file:
for line in file:
sample_lims_ids.append(line.strip())
if not args.all:
limsids = {}
for j in sample_lims_ids:
xml=(requests.get("https://uphl-ngs.claritylims.com/api/v2/samples?name=%s" % j,
auth=HTTPBasicAuth(args.user, args.password)).content).decode("utf-8")
# Parse the XML data
limsids[j]=(re.findall(r'limsid="([^"]+)"', xml))
ids_df=[]
for j in limsids.keys():
try:
xml=(requests.get("https://uphl-ngs.claritylims.com/api/v2/samples/%s" % limsids[j][0],
auth=HTTPBasicAuth(args.user, args.password)).content).decode("utf-8")
# Parse the XML data
root = ET.fromstring(xml)
namespace_map = {
'udf': 'http://genologics.com/ri/userdefined',
'ri': 'http://genologics.com/ri',
'file': 'http://genologics.com/ri/file',
'smp': 'http://genologics.com/ri/sample'}
# Find the udf:field element with name="Species"
ids=[]
for i in udfs:
species_element = root.find('.//udf:field[@name="%s"]'% i, namespaces=namespace_map)
try:
ids.append(species_element.text)
except:
ids.append("")
ids_df.append([j]+ids)
except:
continue
if args.output:
pd.DataFrame(ids_df, columns = ['Sample_ID']+udfs).to_csv(args.output, index=False)
else:
pd.DataFrame(ids_df, columns = ['Sample_ID']+udfs).to_csv('clarity_ids_output.csv', index=False)
else:
ids_df=[]
for j in sample_lims_ids:
try:
xml=(requests.get("https://uphl-ngs.claritylims.com/api/v2/samples/%s" % j,
auth=HTTPBasicAuth(args.user, args.password)).content).decode("utf-8")
# Parse the XML data
name = re.findall(r'<name>(.*?)</name>', xml)
root = ET.fromstring(xml)
namespace_map = {
'udf': 'http://genologics.com/ri/userdefined',
'ri': 'http://genologics.com/ri',
'file': 'http://genologics.com/ri/file',
'smp': 'http://genologics.com/ri/sample'}
# Find the udf:field element with name="Species"
ids=[]
for i in udfs:
species_element = root.find('.//udf:field[@name="%s"]'% i, namespaces=namespace_map)
try:
ids.append(species_element.text)
except:
ids.append("")
ids_df.append([name]+ids)
except:
continue
if args.output:
pd.DataFrame(ids_df, columns = ['Sample_ID']+udfs).to_csv(args.output, index=False)
else:
pd.DataFrame(ids_df, columns = ['Sample_ID']+udfs).to_csv('clarity_ids_output.csv', index=False)