-
Notifications
You must be signed in to change notification settings - Fork 0
/
openfda_import.py
152 lines (148 loc) · 7.14 KB
/
openfda_import.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import requests
import time
# API key
openfda_key = ""
# Start wiki session
print("Start wiki session")
S = requests.Session()
URL = "https://bciwiki.org/api.php"
# GET Request to fetch login token
PARAMS_0 = {
"action": "query",
"meta": "tokens",
"type": "login",
"format": "json"
}
R = S.get(url=URL, params=PARAMS_0)
DATA = R.json()
LOGIN_TOKEN = DATA['query']['tokens']['logintoken']
# POST Request to log in.
PARAMS_1 = {
"action": "login",
"lgname": "",
"lgpassword": "",
"lgtoken": LOGIN_TOKEN,
"format": "json"
}
R = S.post(URL, data=PARAMS_1)
# GET request to fetch CSRF token
PARAMS_2 = {
"action": "query",
"meta": "tokens",
"format": "json"
}
R = S.get(url=URL, params=PARAMS_2)
DATA = R.json()
CSRF_TOKEN = DATA['query']['tokens']['csrftoken']
# Loop through list of companies
companies = open("companies.txt", "r").read().splitlines()
for company in companies:
print(company)
# Get device listings from PMA endpoint (id, name, class, category)
print("Get device listings from PMA endpoint (id, name, class, category)")
skip = 0
end_found = False
pma_result_count = 0
pma_results = []
while not end_found:
resp = requests.get("https://api.fda.gov/device/pma.json?search=applicant:"+company+"&limit=1000&skip="+str(skip)).json()
try:
if len(resp["error"]) > 0:
end_found = True
except Exception:
try:
for result in resp["results"]:
if company in result["applicant"]:
try:
pma_results.append([result["pma_number"], result["openfda"]["device_name"], result["openfda"]["device_class"], result["advisory_committee_description"]])
pma_result_count += 1
except Exception:
pass
except Exception as e:
print(resp)
input(e)
skip += 1000
print(pma_result_count)
# wait 1 second
time.sleep(1)
# Get device listings from PMN endpoints (id, name, class, category)
print("Get device listings from PMN endpoint (id, name, class, category)")
skip = 0
end_found = False
pmn_result_count = 0
pmn_results = []
while not end_found:
resp = requests.get("https://api.fda.gov/device/510k.json?search=applicant:"+company+"&limit=1000&skip="+str(skip)).json()
try:
if len(resp["error"]) > 0:
end_found = True
except Exception:
for result in resp["results"]:
if company in result["applicant"]:
try:
pmn_results.append([result["k_number"], result["openfda"]["device_name"], result["openfda"]["device_class"], result["advisory_committee_description"]])
pmn_result_count += 1
except Exception:
pass
skip += 1000
print(pmn_result_count)
# wait 1 second
time.sleep(1)
# if any results are returned
if pma_result_count > 0 or pmn_result_count > 0:
# Get current page contents from wiki
company_page_content = requests.get("https://bciwiki.org/index.php?title="+company+"&action=edit").text
company_page_content = company_page_content[company_page_content.find('name="wpTextbox1">')+18:company_page_content.find("</textarea>")]
if "==FDA==" not in company_page_content:
# Create formatted table and description text
total_result_count = str(pma_result_count+pmn_result_count)
new_content = "==FDA==\n"+company+" has "+total_result_count+" medical devices registered with the FDA. Here are some of them:\n"
new_content += '{| class="wikitable" style="margin:auto"\n|+ Examples of FDA Approved Devices ([['+company.replace(" ", "_")+'_FDA_Devices | View List]])\n'
new_content += '|-\n! Device ID !! Device Name !! Class !! Category !! PMA !! PMN\n'
for a in range(0, 5):
if pma_result_count > a:
new_content += '|-\n| [https://www.accessdata.fda.gov/scrIpts/cdrh/devicesatfda/index.cfm?db=pma&id='+str(pma_results[a][0])+' '+str(pma_results[a][0])+'] || '+str(pma_results[a][1])+' || '+str(pma_results[a][2])+' || '+str(pma_results[a][3])+' || True || False\n'
for a in range(0, 5):
if pmn_result_count > a:
new_content += '|-\n| [https://www.accessdata.fda.gov/scripts/cdrh/devicesatfda/index.cfm?db=pmn&id='+str(pmn_results[a][0])+' '+str(pmn_results[a][0])+'] || '+str(pmn_results[a][1])+' || '+str(pmn_results[a][2])+' || '+str(pmn_results[a][3])+' || False || True\n'
new_content += '|}'
# Add table to company description
company_page_content = company_page_content.split("==Links==")
company_page_content = company_page_content[0]+new_content+"\n==Links==\n"+company_page_content[1]
# Update page
print("Update page /index.php/"+company.replace(" ", "_"))
# POST request to edit a page
PARAMS_3 = {
"action": "edit",
"title": company,
"token": CSRF_TOKEN,
"format": "json",
"text": company_page_content
}
R = S.post(URL, data=PARAMS_3)
DATA = R.json()
print(DATA)
# Create Company_Name_FDA_Devices page with full list of devices
print("Creating /index.php/"+company.replace(" ", "_")+"_FDA_Devices")
devices_page_content = "This is an expanded list of medical devices registered with the FDA by [["+company+"]].\n"
devices_page_content += "Find out more [[OpenFDA_Integration | here]].\n"
devices_page_content += '{| class="wikitable" style="margin:auto"\n|+ FDA Approved Devices\n'
devices_page_content += '|-\n! Device ID !! Device Name !! Class !! Category !! PMA !! PMN\n'
for a in range(0, pma_result_count):
if pma_result_count > a:
devices_page_content += '|-\n| [https://www.accessdata.fda.gov/scrIpts/cdrh/devicesatfda/index.cfm?db=pma&id='+str(pma_results[a][0])+' '+str(pma_results[a][0])+'] || '+str(pma_results[a][1])+' || '+str(pma_results[a][2])+' || '+str(pma_results[a][3])+' || True || False\n'
for a in range(0, pmn_result_count):
if pmn_result_count > a:
devices_page_content += '|-\n| [https://www.accessdata.fda.gov/scripts/cdrh/devicesatfda/index.cfm?db=pmn&id='+str(pmn_results[a][0])+' '+str(pmn_results[a][0])+'] || '+str(pmn_results[a][1])+' || '+str(pmn_results[a][2])+' || '+str(pmn_results[a][3])+' || False || True\n'
devices_page_content += '|}'
# POST request to create a page
PARAMS_3 = {
"action": "edit",
"title": company+" FDA Devices",
"token": CSRF_TOKEN,
"format": "json",
"text": devices_page_content
}
R = S.post(URL, data=PARAMS_3)
DATA = R.json()
print(DATA)