-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdownload.py
70 lines (59 loc) · 2.31 KB
/
download.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import os
import json
import argparse
import pprint
import parsl
from parsl import python_app
from parsl.config import Config
from parsl.executors.threads import ThreadPoolExecutor
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Run analysis on baconbits files using processor coffea files')
parser.add_argument('-i', '--input', default=r'metadata/dataset.json', help='')
parser.add_argument('-d', '--dir', help='Storage directory', required=True)
parser.add_argument('-l', '--limit', type=int, default=None, help='Only download N files.')
parser.add_argument('-o', '--output', default=r'metadata/dataset_local.json', help='')
parser.add_argument('--download', help='Bool', action='store_true')
args = parser.parse_args()
# load dataset
with open(args.input) as f:
sample_dict = json.load(f)
print("Storage dir:")
print(" ", os.path.realpath(args.dir))
# Download instance
@python_app
def down_file(fname, out, ith=None):
if ith is not None:
print(ith)
os.system("xrdcp -P " + fname + " " + out)
return 0
# Setup multithreading
config = Config(executors=[ThreadPoolExecutor(max_threads=8)])
parsl.load(config)
# Write futures
out_dict = {} # Output filename list
run_futures = [] # Future list
for key in sorted(sample_dict.keys()):
new_list = []
#print(key)
for i, fname in enumerate(sample_dict[key][:args.limit]):
if i%5 == 0:
# print some progress info
ith = f'{key}: {i}/{len(sample_dict[key])}'
else:
ith = None
out = os.path.join(os.path.realpath(args.dir), fname.split("//")[-1].lstrip("/"))
new_list.append(out)
if args.download:
if os.path.isfile(out):
'File found'
else:
x = down_file(fname, out, ith)
run_futures.append(x)
out_dict[key] = new_list
for i, r in enumerate(run_futures):
r.result()
print("Writing files to {}".format(args.output))
with open(args.output, 'w') as fp:
json.dump(out_dict, fp, indent=4, sort_keys=True)
if not args.download:
print("To actually run include ``--download``.")