forked from cedadev/opendap-python-example
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimple_file_downloader.py
151 lines (111 loc) · 3.7 KB
/
simple_file_downloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# encoding: utf-8
"""
remote_nc_reader.py
===================
Python script for downloading a file from the CEDA archive.
Pre-requisites:
- Python 2.7 or 3.X
- Python libraries (installed by Pip):
```
ContrailOnlineCAClient
```
Usage:
```
$ python simple_file_downloader.py <url>
```
Example:
```
$ URL=http://dap.ceda.ac.uk/thredds/dodsC/badc/ukcp18/data/marine-sim/skew-trend/rcp85/skewSurgeTrend/latest/skewSurgeTrend_marine-sim_rcp85_trend_2007-2099.nc
$ python simple_file_downloader.py $URL
```
"""
# Import standard libraries
import os
import sys
import datetime
import requests
# Import third-party libraries
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from contrail.security.onlineca.client import OnlineCaClient
CERTS_DIR = os.path.expanduser('~/.certs')
if not os.path.isdir(CERTS_DIR):
os.makedirs(CERTS_DIR)
TRUSTROOTS_DIR = os.path.join(CERTS_DIR, 'ca-trustroots')
CREDENTIALS_FILE_PATH = os.path.join(CERTS_DIR, 'credentials.pem')
TRUSTROOTS_SERVICE = 'https://slcs.ceda.ac.uk/onlineca/trustroots/'
CERT_SERVICE = 'https://slcs.ceda.ac.uk/onlineca/certificate/'
def cert_is_valid(cert_file, min_lifetime=0):
"""
Returns boolean - True if the certificate is in date.
Optional argument min_lifetime is the number of seconds
which must remain.
:param cert_file: certificate file path.
:param min_lifetime: minimum lifetime (seconds)
:return: boolean
"""
try:
with open(cert_file, 'rb') as f:
crt_data = f.read()
except IOError:
return False
try:
cert = x509.load_pem_x509_certificate(crt_data, default_backend())
except ValueError:
return False
now = datetime.datetime.now()
return (cert.not_valid_before <= now
and cert.not_valid_after > now + datetime.timedelta(0, min_lifetime))
def setup_credentials():
"""
Download and create required credentials files.
Return True if credentials were set up.
Return False is credentials were already set up.
:param force: boolean
:return: boolean
"""
# Test for DODS_FILE and only re-get credentials if it doesn't
# exist AND `force` is True AND certificate is in-date.
if cert_is_valid(CREDENTIALS_FILE_PATH):
print('[INFO] Security credentials already set up.')
return False
# Get CEDA username and password from environment variables
username = os.environ['CEDA_USERNAME']
password = os.environ['CEDA_PASSWORD']
onlineca_client = OnlineCaClient()
onlineca_client.ca_cert_dir = TRUSTROOTS_DIR
# Set up trust roots
trustroots = onlineca_client.get_trustroots(
TRUSTROOTS_SERVICE,
bootstrap=True,
write_to_ca_cert_dir=True)
# Write certificate credentials file
key_pair, certs = onlineca_client.get_certificate(
username,
password,
CERT_SERVICE,
pem_out_filepath=CREDENTIALS_FILE_PATH)
print('[INFO] Security credentials set up.')
return True
def main(file_url):
"""
Main controller function.
:param nc_file_url: URL to a NetCDF4 opendap end-point.
:param var_id: Variable ID [String]
:return: None
"""
try:
setup_credentials()
except KeyError:
print("CEDA_USERNAME and CEDA_PASSWORD environment variables required")
return
# Download file to current working directory
response = requests.get(file_url, cert=(CREDENTIALS_FILE_PATH), verify=False)
filename = file_url.rsplit('/', 1)[-1]
with open(filename, 'wb') as file_object:
file_object.write(response.content)
if __name__ == '__main__':
try:
main(sys.argv[1])
except IndexError:
print("Please provide a file URL as input")