-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy patheax_stream.py
103 lines (83 loc) · 3.23 KB
/
eax_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from eax import gf_double, xorstrings
# these classes simulate the way it could be done in C in online mode, byte by byte
class OMAC_stream:
def __init__(self, cfg, key, tweak):
enc = cfg.ECB(key)
L = enc.run(bytes([0] * cfg.BLOCKSIZE))
L_int = int.from_bytes(L, cfg.ENDIAN, signed=False)
L2_int = gf_double(L_int, cfg.BLOCKSIZE)
L4_int = gf_double(L2_int, cfg.BLOCKSIZE)
self.cfg = cfg
self.L2 = L2_int.to_bytes(cfg.BLOCKSIZE, cfg.ENDIAN)
self.L4 = L4_int.to_bytes(cfg.BLOCKSIZE, cfg.ENDIAN)
self.enc = enc
self.readyblock = int.to_bytes(tweak, cfg.BLOCKSIZE, 'big')
self.mac = bytes(cfg.BLOCKSIZE)
self.buf = bytes([])
def process_byte(self, byte):
cfg = self.cfg
self.buf += bytes([byte])
if len(self.buf) == cfg.BLOCKSIZE: # full buf collected, ok. process prev
xorred = xorstrings(self.readyblock, self.mac)
self.mac = self.enc.run(xorred)
self.readyblock = self.buf
self.buf = bytes([])
def digest(self):
readyblock = self.readyblock
buf = self.buf
mac = self.mac
cfg = self.cfg
if not buf: # readyblock is last
readyblock = xorstrings(readyblock, self.L2)
xorred = xorstrings(readyblock, mac)
mac = self.enc.run(xorred)
if buf:
buf += bytes([0x80])
buf = buf.ljust((len(buf) + cfg.BLOCKSIZE - 1) & -cfg.BLOCKSIZE, b'\0')
xorred = xorstrings(xorstrings(buf, self.L4), mac)
mac = self.enc.run(xorred)
return mac
class CTR_stream:
def __init__(self, cfg, key, nonce):
enc = cfg.ECB(key)
nonce_int = int.from_bytes(nonce, cfg.ENDIAN, signed=False)
self.cfg = cfg
self.enc = enc
self.nonce = nonce_int
self.pos = 0
self.xorbuf = None
def process_byte(self, byte):
cfg = self.cfg
if self.pos % cfg.BLOCKSIZE == 0:
counter = (self.nonce + self.pos // cfg.BLOCKSIZE) & cfg.BLOCKSIZE_MASK
counter = counter.to_bytes(cfg.BLOCKSIZE, cfg.ENDIAN)
self.xorbuf = self.enc.run(counter)
pt = self.xorbuf[self.pos % cfg.BLOCKSIZE] ^ byte
self.pos += 1
return bytes([pt])
# wrappers for the online stream api to test it
def omac_stream(cfg, key, data, k):
s = OMAC_stream(cfg, key, k)
for b in data:
s.process_byte(b)
return s.digest()
def ctr_stream(cfg, key, nonce, data):
s = CTR_stream(cfg, key, nonce)
out = b''
for b in data:
out += s.process_byte(b)
return out
def eax_enc(cfg, key, nonce, header, pt):
N = omac_stream(cfg, key, nonce, 0)
H = omac_stream(cfg, key, header, 1)
ct = ctr_stream(cfg, key, N, pt)
C = omac_stream(cfg, key, ct, 2)
tag = xorstrings(xorstrings(N, C), H)
return (ct, tag)
def eax_dec(cfg, key, nonce, header, ct):
N = omac_stream(cfg, key, nonce, 0)
H = omac_stream(cfg, key, header, 1)
C = omac_stream(cfg, key, ct, 2)
tag_local = xorstrings(xorstrings(N, C), H)
pt = ctr_stream(cfg, key, N, ct)
return (pt, tag_local)