-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy patharc-sync-scans.py
106 lines (87 loc) · 3.57 KB
/
arc-sync-scans.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#!/usr/bin/python
# arc-sync-scans.py
# Copyright (c) 2010,2011 Washington University
# Author: Kevin A. Archie <[email protected]>
#
# Usage:
# arc-sync-scans \
# -h ${XNAT_BASE_URL} \
# --proj ${XNAT_PROJECT} \ (NOTE preceding double dash)
# -u ${XNAT_USER} \
# -p ${XNAT_PASSWORD} \
# (-u and -p are optional; user will be prompted if these are omitted)
# -m ${REQUESTED_MODALITIES} \
# (-m is optional; multiple values separated by commas)
# -l ${LOCAL_CACHE_DIR} \
# (-l is optional; defaults to ${PWD}/${XNAT_PROJECT})
# -s ${REQUESTED_SCAN_TYPES} \
# (-t is optional; multiple values separated by commas)
# -v (optional, displays information about scans being retrieved)
import base64, getopt, getpass, json, os, re, shutil, string, sys, urllib2 as u, zipfile as z
# Read and parse command-line arguments
optsl,args = getopt.getopt(sys.argv[1:], 'h:u:p:m:l:s:v', ['proj='])
opts=dict(optsl)
# XNAT base URL can be either -h or the first non-option argument
base=opts['-h'] if '-h' in opts else args[0]
# XNAT user/pass can be -u/-p or prompted
user=opts['-u'] if '-u' in opts else raw_input('username: ')
passwd=opts['-p'] if '-p' in opts else getpass.getpass('password: ')
verbose='-v' in opts
# Build HTTP header for Basic Authorization
auth={'Authorization':'Basic '+base64.encodestring(user+':'+passwd).strip()}
# Requested modalities
if '-m' in opts:
xsis = map(lambda m: 'xsiType=xnat:' + m.lower() + 'SessionData&',
opts['-m'].split(','))
else:
xsis=['']
# Local data cache; create if it doesn't already exist
cache=opts['-l'] if '-l' in opts else opts['--proj']
try:
os.makedirs(cache)
except OSError:
pass
# Requested scan types
types=opts['-s'] if '-s' in opts else 'ALL'
# What sessions are already (at least partially) present in data cache?
existing=os.listdir(cache)
# Copy requested types into a list so we can look for matches
typelist=types.split(',')
def get(uri):
"""Retrieves the resource text from the given URI on the named host."""
return u.urlopen(u.Request(url=base+uri, headers=auth))
def get_to_fh(uri, fh):
"""Copies the resource text from the given URI to the file handle."""
uh = u.urlopen(u.Request(url=base+uri, headers=auth))
shutil.copyfileobj(uh, fh)
def getJSONResults(uri):
"""Retrieves the Result array from an XNAT JSON search result."""
return json.load(get(uri))['ResultSet']['Result']
def getScanFiles(expt, scans):
"""Retrieves the data files for the given scan names or types from
the given experiment."""
args = {'URI':'/REST/projects/%(project)s/subjects/%(subject_label)s/experiments/%(label)s' % expt,
'scans':scans}
if verbose:
print 'Getting %(URI)s: %(scans)s' % args
zfh = os.tmpfile()
try:
get_to_fh('%(URI)s/scans/%(scans)s/files?format=zip' % args, zfh)
zfh.seek(0, os.SEEK_SET)
z.ZipFile(zfh, 'r').extractall(cache)
finally:
zfh.close()
# main
for xsi in xsis:
opts['xsi']=xsi
for expt in getJSONResults('/REST/projects/%(--proj)s/experiments?%(xsi)sformat=json&columns=subject_label,label' % opts):
expt['project']=opts['--proj']
label = expt['label']
if label not in existing:
getScanFiles(expt, types)
else:
for scan in getJSONResults('%(URI)s/scans' % expt):
if types == 'ALL' or scan['type'] in typelist:
name=re.sub(r'[^\w\-]', '_', "%(ID)s-%(type)s" % scan)
if name not in os.listdir(cache+'/'+label+'/scans'):
getScanFiles(expt, scan['ID'])