-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathtest_wraith.py
243 lines (192 loc) · 7.44 KB
/
test_wraith.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
"""Use wraith to compare current version against published docs.
"""
import unittest
import os
import copy
import re
import yaml
import subprocess
import contextlib
from distutils.version import LooseVersion
import http.server
import socketserver
import threading
REFERENCE_URL = "https://www.cgat.org/downloads/public/CGATReport/documentation"
WRAITH_WORKDIR = os.path.abspath("wraith")
TEST_PORT=9100
TEST_HOST="localhost"
spider_config_template = """
browser: "phantomjs"
domains:
test: http://{test_host}:{test_port}
spider_skips:
- !ruby/regexp /static$/
- !ruby/regexp /%23/
- !ruby/regexp /.eps$/
- !ruby/regexp /.svg$/
- !ruby/regexp /.xlsx$/
- !ruby/regexp /notebook/
- !ruby/regexp /code/
directory: 'shots'
imports: "{wraith_data_config}"
phantomjs_options: '--ignore-ssl-errors=true --ssl-protocol=tlsv1'
"""
capture_config_template = """
browser: "phantomjs"
domains:
test: http://{test_host}:{test_port}
current: {reference_url}
spider_skips:
- !ruby/regexp /static$/
- !ruby/regexp /%23/
imports: "{wraith_data_config}"
screen_widths:
- 1280
directory: 'shots'
fuzz: '20%'
threshold: 5
gallery:
thumb_width: 200
thumb_height: 200
mode: diffs_only
phantomjs_options: '--ignore-ssl-errors=true --ssl-protocol=tlsv1'
"""
@contextlib.contextmanager
def changedir(path):
save_dir = os.path.abspath(os.getcwd())
os.chdir(path)
try:
yield
finally:
os.chdir(save_dir)
def run_server():
run("python -m http.server {} >& server.log".format(TEST_PORT))
@contextlib.contextmanager
def start_server(workdir):
handler = http.server.SimpleHTTPRequestHandler
with changedir(workdir):
# thread = threading.Thread(target=run_server)
# thread.start()
print("yielding")
yield
print("back from yield")
def run(statement,
return_stdout=False,
return_popen=False,
**kwargs):
'''execute a command line statement.
By default this method returns the code returned by the executed
command. If *return_stdout* is True, the contents of stdout are
returned as a file object. If *return_popen*, the Popen object is
returned.
``kwargs`` are passed on to subprocess.call,
subprocess.check_output or subprocess.Popen.
Raises
------
OSError
If process failed or was terminated.
'''
# remove new lines
statement = " ".join(re.sub("\t+", " ", statement).split("\n")).strip()
print(statement)
if "<(" in statement:
shell = os.environ.get('SHELL', "/bin/bash")
if "bash" not in shell:
raise ValueError(
"require bash for advanced shell syntax: <()")
# Note: pipes.quote is deprecated. In Py3, use shlex.quote
# (not present in Py2.7)
statement = "%s -c %s" % (shell, pipes.quote(statement))
if return_stdout:
return subprocess.check_output(statement, shell=True, **kwargs).decode("utf-8")
elif return_popen:
return subprocess.Popen(statement, shell=True, **kwargs)
else:
retcode = subprocess.call(statement, shell=True, **kwargs)
if retcode < 0:
raise OSError("process was terminated by signal %i" % -retcode)
return retcode
def check_version(cmd, regex, min_version):
version_txt = run(cmd , return_stdout=True)
version = re.search(regex, version_txt).groups()[0]
if LooseVersion(version) < LooseVersion(min_version):
raise ValueError("version check failed: {} < {}, '{}'".format(
version, min_version, cmd))
return version
class TestWraith(unittest.TestCase):
def setUp(self):
source_dir = os.path.join(
os.path.dirname(os.path.dirname(
os.path.abspath(__file__))),
"doc", "_build", "html")
# check if npm is intalled
npm_version = check_version("npm --version", "(\S+)", "3.10")
# check if phantomjs is installed
phantomjs_version = check_version("npm list -g | grep phantom",
"phantomjs@(\S+)",
"2.1")
ruby_version = check_version("ruby --version",
"ruby (\S+)",
"2.1")
wraith_version = check_version(
"gem list | grep wraith",
"wraith \((\S+)\)",
"4.0.1")
# get gem info
gem_data = yaml.load(run("gem environment", return_stdout=True))
gem_paths = []
for record in gem_data["RubyGems Environment"]:
for key, value in record.items():
if key == "GEM PATHS":
gem_paths.extend(value)
break
if not gem_paths:
raise ValueError("could not find GEM PATHS in gem environment")
filenames = [os.path.join(path,
"gems/wraith-{}/lib/wraith/spider.rb".format(wraith_version))
for path in gem_paths]
if sum([os.path.exists(fn) for fn in filenames]) == 0:
raise ValueError("could not find file spider.rb to patch in {}".format(filenames))
for fn in filenames:
if not os.path.exists(fn):
continue
with open(fn) as inf:
data = inf.read()
if "path.downcase" in data:
with open(fn, "w") as outf:
outf.write(re.sub("path.downcase", "path", data))
# crawl new docs to collect documents to test
config_dir = os.path.abspath(os.path.join(WRAITH_WORKDIR, "config"))
wraith_spider_config = os.path.join(config_dir, "wraith_spider.yml")
wraith_capture_config = os.path.join(config_dir, "wraith_capture.yml")
wraith_data_config = os.path.join(config_dir, "wraith_data.yml")
if not os.path.exists(config_dir):
os.makedirs(config_dir)
if not os.path.exists(wraith_spider_config):
# do not crawl with reference, as crawler follows external links
spider_config = spider_config_template.format(
wraith_data_config=os.path.basename(wraith_data_config),
test_host=TEST_HOST,
test_port=TEST_PORT)
with open(wraith_spider_config, "w") as outf:
outf.write(spider_config)
if not os.path.exists(wraith_data_config):
with start_server(source_dir) as server:
run("cd {} && wraith spider {}".format(WRAITH_WORKDIR, wraith_spider_config))
if not os.path.exists(wraith_capture_config):
# do not crawl with reference, as crawler follows external links
capture_config = capture_config_template.format(
wraith_data_config=os.path.basename(wraith_data_config),
reference_url=REFERENCE_URL,
test_host=TEST_HOST,
test_port=TEST_PORT)
with open(wraith_capture_config, "w") as outf:
outf.write(capture_config)
self.wraith_capture_config = wraith_capture_config
self.source_dir = source_dir
def test_against_reference(self):
with start_server(self.source_dir) as server:
run("cd {} && wraith capture {}".format(WRAITH_WORKDIR,
self.wraith_capture_config))
if __name__ == "__main__":
unittest.main()