-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathfaersDownloader_backup.py
143 lines (123 loc) · 4.42 KB
/
faersDownloader_backup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# coding: utf-8
# author: Jing Li
# date: 2019/04/01
import os
import re
import lxml
import time
import shutil
import warnings
import requests
from tqdm import tqdm
from io import BytesIO
from zipfile import ZipFile
from datetime import datetime
from bs4 import BeautifulSoup
from urllib.request import urlopen
# this script will find target in this list pages.
host_url = "http://www.nber.org"
target_page = ["http://www.nber.org/data/fda-adverse-event-reporting-system-faers-data.html"]
# local directory to save files.
source_dir = "FAERSsrc"
data_dir = "FAERSdata"
# ignore warnings
warnings.filterwarnings('ignore')
def downloadFiles(faers_files, source_dir, data_dir):
"""
download faers data files.
:param faers_files: dict faers_files = {"name":"url"}
:param source_dir: FAERSsrc
:param data_dir: FAERSdata
:return: none
"""
for file_name in tqdm(faers_files):
try:
print("Download " + file_name + "\t" + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
r = requests.get(faers_files[file_name], timeout=200)
z = ZipFile(BytesIO(r.content))
z.extractall(source_dir)
r.close()
# delete and copy files to FAERSdata.
deleteUnwantedFiles(source_dir)
copyFiles(source_dir, data_dir)
print("Download " + file_name + " success!\t" + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
except Exception as e:
print("Download " + file_name + " failed! Error: " + str(e))
print("Sleep 30 seconds before starting download a new file.\n")
time.sleep(30)
def deleteUnwantedFiles(path):
"""
delete unwanted files.
:param path: FAERSsrc
:return: none
"""
print("Delete unwanted files.\t" + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
for parent, dirnames, filenames in os.walk(path):
for fn in filenames:
# FDA Adverse Event Reporting System (FAERS) began 2012Q4.
# keep data from 2012Q4 and after.
if fn[4:8] < "12Q4":
print("Delete " + fn)
if fn.lower().endswith('.pdf') or fn.lower().endswith('.doc'):
print("Delete " + fn)
os.remove(os.path.join(parent, fn))
elif fn.upper().startswith(("RPSR", "INDI", "THER", "SIZE", "STAT", "OUTC")):
print("Delete " + fn)
os.remove(os.path.join(parent, fn))
print("Delete unwanted files done!\t" + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
def copyFiles(source_dir, data_dir):
"""
Copy files from FAERSsrc to FAERSdata.
:param source_dir: FAERSsrc
:param data_dir: FAERSdata
:return: none
"""
print("Copy files from " + source_dir + " to " + data_dir + ". ", end="")
RootDir = os.getcwd() + '/' + source_dir
TargetFolder = os.getcwd() + '/' + data_dir
for root, dirs, files in os.walk((os.path.normpath(RootDir)), topdown=False):
for name in files:
if name.lower().endswith('.txt'):
SourceFolder = os.path.join(root, name)
shutil.move(SourceFolder, TargetFolder)
print("Done! ")
def getFilesUrl():
"""
find all web urls in target page.
:return: dict files = {"name":"url"}
"""
print("Get web urls.\t")
files = {}
for page_url in target_page:
try:
request = urlopen(page_url)
page_bs = BeautifulSoup(request, "lxml")
request.close()
except:
request = urlopen(page_url)
page_bs = BeautifulSoup(request)
for url in page_bs.find_all("a"):
a_string = str(url)
if "ASCII" in a_string.upper():
t_url = url.get('href')
files[str(url.get('href'))[-16:-4]] = host_url + t_url
# save urls to FaersFilesWebUrls.txt
save_path = os.getcwd() + "/FaersFilesWebUrls.txt"
if (os.path.exists(save_path)):
os.remove(save_path)
with open(save_path, 'a+') as f:
for k in files.keys():
f.write(k + ":" + files[k] + "\n")
print("Done!")
return files
def main():
# creating the source directory if not exits.
if not os.path.isdir(source_dir):
os.makedirs(source_dir)
if not os.path.isdir(data_dir):
os.makedirs(data_dir)
# get faers data file's url and download them.
faers_files = getFilesUrl()
downloadFiles(faers_files, source_dir, data_dir)
if __name__ == '__main__':
main()