-
Notifications
You must be signed in to change notification settings - Fork 721
/
Copy pathtest_renegotiate.py
377 lines (301 loc) · 16 KB
/
test_renegotiate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import copy
import pytest
import random
from configuration import available_ports, ALL_TEST_CIPHERS, ALL_TEST_CURVES, MINIMAL_TEST_CERTS, PROTOCOLS
from common import ProviderOptions, Protocols
from fixtures import managed_process # lgtm [py/unused-import]
from providers import Provider, S2N, OpenSSL
from utils import invalid_test_parameters, get_parameter_name
# TLS1.3 does not support renegotiation
TEST_PROTOCOLS = [x for x in PROTOCOLS if x.value < Protocols.TLS13.value]
# Command line options to enable renegotiation
S2N_RENEG_OPTION = "--renegotiation"
S2N_RENEG_ACCEPT = "accept"
S2N_RENEG_REJECT = "reject"
S2N_RENEG_WAIT = "wait"
OPENSSL_RENEG_CTRL_CMD = 'r\n'
# Output indicating renegotiation state
S2N_RENEG_START_MARKER = "RENEGOTIATE"
S2N_RENEG_SUCCESS_MARKER = "s2n is ready, again"
OPENSSL_RENEG_REQ_MARKER = "SSL_accept:SSLv3/TLS write hello request"
OPENSSL_RENEG_WARN_MARKER = "SSL3 alert read:warning:no renegotiation"
# Methods to check renegotiation state
def renegotiate_was_requested(openssl_results):
return to_bytes(OPENSSL_RENEG_REQ_MARKER) in openssl_results.stderr
def renegotiate_was_rejected(openssl_results):
return to_bytes(OPENSSL_RENEG_WARN_MARKER) in openssl_results.stderr
def renegotiate_was_started(s2n_results):
return to_bytes(S2N_RENEG_START_MARKER) in s2n_results.stdout
def renegotiate_was_successful(s2n_results):
return renegotiate_was_started(s2n_results) and \
to_bytes(S2N_RENEG_SUCCESS_MARKER) in s2n_results.stdout
# Basic conversion methods
def to_bytes(val):
return str(val).encode('utf-8')
def to_marker(val):
return bytes(val).decode('utf-8')
"""
Msg handles translating a series of human-readable messages into the "markers" required
by the integrationv2 framework.
The integrationv2 framework controls process IO with various types of "markers".
Actions like writing data to a process's stdin or shutting down a process's stdin
can only occur if specific "markers" are read from the process's stdio. This makes
a simple scenario like a client and server taking turns sending application data require
careful construction of the data_to_send, send_marker, and close_marker options.
For simplicity, "Msg" currently assumes:
- s2n sends first. This lets us assume the first send_marker is S2N.get_send_marker().
- The server sends last. This lets us assume that only the client needs a close_marker.
"""
class Msg(object):
def __init__(self, mode, send_marker=None, ctrl_str=None):
self.mode = mode
# Indicates what stdio string should trigger this message.
# Will default to the previous message (see Msg.send_markers).
self.send_marker = send_marker
# Indicates that the message will be consumed by the process itself
# rather than sent to the peer. This means it will never appear in
# the peer's stdio and cannot be used as a send_marker.
self.ctrl = ctrl_str is not None
if ctrl_str:
self.data_str = ctrl_str
else:
self.data_str = mode.upper() + ":" + str(random.getrandbits(8 * 10))
@staticmethod
def data_to_send(messages, mode):
data_bytes = []
for message in messages:
if message.mode is not mode:
continue
if message.ctrl:
data_bytes.append(to_bytes(message.data_str))
else:
# Openssl processes initial ASCII characters strangely,
# but our framework is not good at handling non-ASCII characters due
# to inconsistent use of bytes vs decode and str vs encode.
# As a workaround, just prepend a throwaway non-ASCII utf-8 character.
data_bytes.append(bytes([0xc2, 0xbb]) + to_bytes(message.data_str))
# We assume that the client will close the connection.
# Give the server one last message to send without a corresponding send_marker.
# The message will never be sent, but waiting to send it will prevent the server
# from closing the connection before the client can.
if mode is Provider.ServerMode:
data_bytes.append(to_bytes("Placeholder to prevent server socket close"))
return data_bytes
@staticmethod
def expected_output(messages, mode):
return [to_bytes(message.data_str) for message in messages if not message.ctrl and message.mode is not mode]
@staticmethod
def send_markers(messages, mode):
send_markers = []
for i, message in enumerate(messages):
if message.mode is not mode:
continue
elif message.send_marker:
send_markers.append(message.send_marker)
elif i == 0:
# Assume that the first sender is s2n
send_markers.append(S2N.get_send_marker())
else:
previous = messages[i-1]
assert (previous.mode is not mode)
send_markers.append(previous.data_str)
return send_markers
@staticmethod
def close_marker(messages):
# Assume that the last sender is the server
assert (messages[-1].mode is Provider.ServerMode)
output = Msg.expected_output(messages, Provider.ClientMode)
return to_marker(output[-1])
@staticmethod
def debug(messages):
print(f'client data to send: {Msg.data_to_send(messages, Provider.ClientMode)}')
print(f'server data to send: {Msg.data_to_send(messages, Provider.ServerMode)}')
print(f'client send markers: {Msg.send_markers(messages, Provider.ClientMode)}')
print(f'server send markers: {Msg.send_markers(messages, Provider.ServerMode)}')
print(f'client close_marker: {Msg.close_marker(messages)}')
print(f'client expected output: {Msg.expected_output(messages, Provider.ClientMode)}')
print(f'server expected output: {Msg.expected_output(messages, Provider.ServerMode)}')
# The order of messages that will trigger renegotiation
# and verify sending and receiving continues to work afterwards.
RENEG_MESSAGES = [
# Client sends first message
Msg(Provider.ClientMode),
# Server initiates renegotiation
Msg(Provider.ServerMode, ctrl_str=OPENSSL_RENEG_CTRL_CMD),
# Server sends first message
Msg(Provider.ServerMode, send_marker=OPENSSL_RENEG_REQ_MARKER),
# Client and Server exchange several more messages
Msg(Provider.ClientMode),
Msg(Provider.ServerMode),
Msg(Provider.ClientMode),
Msg(Provider.ServerMode),
]
def basic_reneg_test(managed_process, cipher, curve, certificate, protocol, provider, messages=RENEG_MESSAGES, reneg_option=None):
options = ProviderOptions(
port=next(available_ports),
cipher=cipher,
curve=curve,
key=certificate.key,
cert=certificate.cert,
protocol=protocol,
insecure=True,
)
client_options = copy.copy(options)
client_options.mode = Provider.ClientMode
client_options.data_to_send = Msg.data_to_send(messages, Provider.ClientMode)
client_options.use_client_auth = True
if reneg_option:
client_options.extra_flags = [S2N_RENEG_OPTION, reneg_option]
server_options = copy.copy(options)
server_options.mode = Provider.ServerMode
server_options.data_to_send = Msg.data_to_send(messages, Provider.ServerMode)
server = managed_process(provider, server_options,
send_marker=Msg.send_markers(messages, Provider.ServerMode),
timeout=8
)
s2n_client = managed_process(S2N, client_options,
send_marker=Msg.send_markers(messages, Provider.ClientMode),
close_marker=Msg.close_marker(messages),
timeout=8
)
return (s2n_client, server)
"""
Renegotiation request ignored by s2n-tls client.
This tests the default behavior for customers who do not enable renegotiation.
"""
@pytest.mark.uncollect_if(func=invalid_test_parameters)
@pytest.mark.parametrize("cipher", ALL_TEST_CIPHERS, ids=get_parameter_name)
@pytest.mark.parametrize("curve", ALL_TEST_CURVES, ids=get_parameter_name)
@pytest.mark.parametrize("certificate", MINIMAL_TEST_CERTS, ids=get_parameter_name)
@pytest.mark.parametrize("protocol", TEST_PROTOCOLS, ids=get_parameter_name)
@pytest.mark.parametrize("provider", [OpenSSL], ids=get_parameter_name)
def test_s2n_client_ignores_openssl_hello_request(managed_process, cipher, curve, certificate, protocol, provider):
(s2n_client, server) = basic_reneg_test(managed_process, cipher, curve, certificate, protocol, provider)
for results in server.get_results():
results.assert_success()
for output in Msg.expected_output(RENEG_MESSAGES, Provider.ServerMode):
assert output in results.stdout
assert renegotiate_was_requested(results)
assert not renegotiate_was_rejected(results)
for results in s2n_client.get_results():
results.assert_success()
for output in Msg.expected_output(RENEG_MESSAGES, Provider.ClientMode):
assert output in results.stdout
assert not renegotiate_was_started(results)
"""
Renegotiation request rejected by s2n-tls client.
"""
@pytest.mark.uncollect_if(func=invalid_test_parameters)
@pytest.mark.parametrize("cipher", ALL_TEST_CIPHERS, ids=get_parameter_name)
@pytest.mark.parametrize("curve", ALL_TEST_CURVES, ids=get_parameter_name)
@pytest.mark.parametrize("certificate", MINIMAL_TEST_CERTS, ids=get_parameter_name)
@pytest.mark.parametrize("protocol", TEST_PROTOCOLS, ids=get_parameter_name)
@pytest.mark.parametrize("provider", [OpenSSL], ids=get_parameter_name)
def test_s2n_client_rejects_openssl_hello_request(managed_process, cipher, curve, certificate, protocol, provider):
(s2n_client, server) = basic_reneg_test(managed_process, cipher, curve, certificate, protocol, provider,
reneg_option=S2N_RENEG_REJECT)
for results in server.get_results():
assert renegotiate_was_requested(results)
assert renegotiate_was_rejected(results)
for results in s2n_client.get_results():
assert results.exit_code != 0
assert not renegotiate_was_started(results)
assert to_bytes("Received alert: 40") in results.stderr
"""
Renegotiation request accepted by s2n-tls client.
"""
@pytest.mark.uncollect_if(func=invalid_test_parameters)
@pytest.mark.parametrize("cipher", ALL_TEST_CIPHERS, ids=get_parameter_name)
@pytest.mark.parametrize("curve", ALL_TEST_CURVES, ids=get_parameter_name)
@pytest.mark.parametrize("certificate", MINIMAL_TEST_CERTS, ids=get_parameter_name)
@pytest.mark.parametrize("protocol", TEST_PROTOCOLS, ids=get_parameter_name)
@pytest.mark.parametrize("provider", [OpenSSL], ids=get_parameter_name)
def test_s2n_client_renegotiate_with_openssl(managed_process, cipher, curve, certificate, protocol, provider):
(s2n_client, server) = basic_reneg_test(managed_process, cipher, curve, certificate, protocol, provider,
reneg_option=S2N_RENEG_ACCEPT)
for results in server.get_results():
results.assert_success()
for output in Msg.expected_output(RENEG_MESSAGES, Provider.ServerMode):
assert output in results.stdout
assert renegotiate_was_requested(results)
assert not renegotiate_was_rejected(results)
for results in s2n_client.get_results():
results.assert_success()
for output in Msg.expected_output(RENEG_MESSAGES, Provider.ClientMode):
assert output in results.stdout
assert renegotiate_was_successful(results)
"""
Renegotiation request with client auth accepted by s2n-tls client.
The openssl server does not require client auth during the first handshake,
but does require client auth during the second handshake.
"""
@pytest.mark.flaky(reruns=3, reruns_delay=1)
@pytest.mark.uncollect_if(func=invalid_test_parameters)
@pytest.mark.parametrize("cipher", ALL_TEST_CIPHERS, ids=get_parameter_name)
@pytest.mark.parametrize("curve", ALL_TEST_CURVES, ids=get_parameter_name)
@pytest.mark.parametrize("certificate", MINIMAL_TEST_CERTS, ids=get_parameter_name)
@pytest.mark.parametrize("protocol", TEST_PROTOCOLS, ids=get_parameter_name)
@pytest.mark.parametrize("provider", [OpenSSL], ids=get_parameter_name)
def test_s2n_client_renegotiate_with_client_auth_with_openssl(managed_process, cipher, curve, certificate, protocol, provider):
# We want to use the same messages to test renegotiation,
# but with 'R' instead of 'r' to trigger the Openssl renegotiate request.
messages = copy.deepcopy(RENEG_MESSAGES)
for m in messages:
if m.ctrl:
m.data_str = 'R\n'
client_auth_marker = "|CLIENT_AUTH"
no_client_cert_marker = "|NO_CLIENT_CERT"
(s2n_client, server) = basic_reneg_test(managed_process, cipher, curve, certificate, protocol, provider,
messages=messages, reneg_option=S2N_RENEG_WAIT)
for results in server.get_results():
results.assert_success()
for output in Msg.expected_output(RENEG_MESSAGES, Provider.ServerMode):
assert output in results.stdout
assert renegotiate_was_requested(results)
assert not renegotiate_was_rejected(results)
for results in s2n_client.get_results():
results.assert_success()
for output in Msg.expected_output(RENEG_MESSAGES, Provider.ClientMode):
assert output in results.stdout
assert renegotiate_was_successful(results)
stdout_str = str(results.stdout)
# The first handshake must not have done client auth
init_finishes = stdout_str.find(S2N.get_send_marker())
assert client_auth_marker not in stdout_str[:init_finishes]
# The second handshake must have done client auth
reneg_finishes = stdout_str.find(S2N_RENEG_SUCCESS_MARKER)
assert client_auth_marker in stdout_str[init_finishes:reneg_finishes]
assert no_client_cert_marker not in stdout_str[init_finishes:reneg_finishes]
"""
The s2n-tls client successfully reads ApplicationData during the renegotiation handshake.
"""
@pytest.mark.flaky(reruns=3, reruns_delay=1)
@pytest.mark.uncollect_if(func=invalid_test_parameters)
@pytest.mark.parametrize("cipher", ALL_TEST_CIPHERS, ids=get_parameter_name)
@pytest.mark.parametrize("curve", ALL_TEST_CURVES, ids=get_parameter_name)
@pytest.mark.parametrize("certificate", MINIMAL_TEST_CERTS, ids=get_parameter_name)
@pytest.mark.parametrize("protocol", TEST_PROTOCOLS, ids=get_parameter_name)
@pytest.mark.parametrize("provider", [OpenSSL], ids=get_parameter_name)
def test_s2n_client_renegotiate_with_app_data_with_openssl(managed_process, cipher, curve, certificate, protocol, provider):
first_server_app_data = Msg.expected_output(RENEG_MESSAGES, Provider.ClientMode)[0]
(s2n_client, server) = basic_reneg_test(managed_process, cipher, curve, certificate, protocol, provider,
reneg_option=S2N_RENEG_WAIT)
for results in server.get_results():
results.assert_success()
for output in Msg.expected_output(RENEG_MESSAGES, Provider.ServerMode):
assert output in results.stdout
assert renegotiate_was_requested(results)
assert not renegotiate_was_rejected(results)
for results in s2n_client.get_results():
results.assert_success()
for output in Msg.expected_output(RENEG_MESSAGES, Provider.ClientMode):
assert output in results.stdout
assert renegotiate_was_successful(results)
stdout_str = str(results.stdout)
# In order to test the case where application data is received during renegotiation,
# we must verify that the data was received after renegotiation started but before the new handshake finished.
reneg_starts = stdout_str.find(S2N_RENEG_START_MARKER)
reneg_finishes = stdout_str.find(S2N_RENEG_SUCCESS_MARKER)
assert to_marker(first_server_app_data) in stdout_str[reneg_starts:reneg_finishes]