-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtests.py
240 lines (171 loc) · 5.13 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
import numba
import numpy as np
import tulipy as ta
from numba import njit
from pytest import approx
from suchak.atr import ATR
from suchak.fibo import Fibo
from suchak.jitclass import jitclass
from suchak.sma import SMA
from suchak.supertrend import Supertrend
from suchak.tr import TR
from suchak.vec import FloatVec
SIZE = 64
def test_vec():
vec = FloatVec(5)
for i in range(10):
vec.push(i)
arr = vec.array()
for i, j in zip(range(10), reversed(range(10))):
assert vec[j] == i
assert arr[j] == i
def test_jitclass():
@jitclass
class Foo:
x: numba.int32 = 42
y: numba.double
def __init__(self):
self.y = 4.2
@njit
def f():
foo = Foo()
return foo.x, foo.y
assert f() == (42, 4.2)
def test_sma():
c = np.random.random(SIZE)
length = 14
sma = SMA(length)
for i in range(sma.offset):
nan = sma.next(c[i])
assert np.isnan(nan)
computed = ta.sma(c, length)
for i in range(len(computed)):
actual = sma.next(c[i + sma.offset])
expected = computed[i]
assert actual == approx(expected)
def test_tr():
h, l, c = np.random.random((3, SIZE))
tr = TR()
for i in range(tr.offset):
nan = tr.next(h[i], l[i], c[i])
assert np.isnan(nan)
computed = ta.tr(h, l, c)
for i in range(len(computed)):
actual = tr.next(h[i + tr.offset], l[i + tr.offset], c[i + tr.offset])
expected = computed[i]
assert actual == approx(expected)
def test_atr():
h, l, c = np.random.random((3, SIZE))
period = 5
atr = ATR(period)
for i in range(atr.offset):
nan = atr.next(h[i], l[i], c[i])
assert np.isnan(nan)
computed = ta.sma(ta.tr(h, l, c), period)
for i in range(len(computed)):
actual = atr.next(h[i + atr.offset], l[i + atr.offset], c[i + atr.offset])
expected = computed[i]
assert actual == approx(expected)
def test_supertrend():
h, l, c = np.random.random((3, SIZE))
period = 5
factor = 30
supertrend = Supertrend(period, factor)
for i in range(supertrend.offset):
st, _ = supertrend.next(h[i], l[i], c[i])
assert np.isnan(st)
computed = ti_supertrend(h, l, c, period, factor)
for i in range(len(computed)):
ast, adt = supertrend.next(
h[i + supertrend.offset], l[i + supertrend.offset], c[i + supertrend.offset]
)
est, edt = computed[i]
assert adt == edt
assert ast == approx(est)
def test_supertrend_arr():
h, l, c = np.random.random((3, SIZE))
period = 5
factor = 30
supertrend = Supertrend(period, factor)
actual = supertrend.next_arr(h, l, c)
computed = ti_supertrend(h, l, c, period, factor)
actual = actual[-len(computed) :]
assert np.all(actual == approx(computed))
def test_fibo():
h, l = np.random.random((2, SIZE))
period = 32
fibo = Fibo(period)
for i in range(fibo.offset):
actual = fibo.next(h[i], l[i])
for i in range(fibo.offset, SIZE):
print(i)
actual = fibo.next(h[i], l[i])
expected = ti_fibo(h[: i + 1], l[: i + 1], period)
assert expected == approx(actual)
# vectorized implementation for cross-verification
def ti_supertrend(
h: np.ndarray, l: np.ndarray, c: np.ndarray, period: int, factor: float,
) -> np.ndarray:
f_atr = factor * ta.sma(ta.tr(h, l, c), period)
out_size = len(f_atr)
slicer = slice(-out_size, None)
# IMPORTANT!
h, l, c = h[slicer], l[slicer], c[slicer]
hl2 = (h + l) / 2
up = hl2 - f_atr
dn = hl2 + f_atr
st = np.empty(out_size)
dt = np.empty(out_size)
st[0] = up[0]
dt[0] = 1
for i in range(1, len(st)):
up1 = up[i - 1]
dn1 = dn[i - 1]
dt1 = dt[i - 1]
c1 = c[i - 1]
if c1 > up1 > up[i]:
up[i] = up1
if c1 < dn1 < dn[i]:
dn[i] = dn1
if (dt1 < 0 and dn1 < c[i]) or (dt1 > 0 and up1 > c[i]):
dt1 *= -1
dt[i] = dt1
if dt1 > 0:
st[i] = up[i]
else:
st[i] = dn[i]
ret = np.empty((out_size, 2))
ret[:, 0] = st
ret[:, 1] = dt
return ret
# vectorized implementation for cross-verification
def ti_fibo(h: np.ndarray, l: np.ndarray, length: int) -> tuple:
h, l = h[-length:], l[-length:]
hbars = np.argmax(h)
lbars = np.argmin(l)
h1 = h[hbars]
l1 = l[lbars]
fark = h1 - l1
fark236 = fark * 0.236
fark382 = fark * 0.382
fark500 = fark * 0.500
fark618 = fark * 0.618
fark786 = fark * 0.786
hl236 = l1 + fark236
hl382 = l1 + fark382
hl500 = l1 + fark500
hl618 = l1 + fark618
hl786 = l1 + fark786
lh236 = h1 - fark236
lh382 = h1 - fark382
lh500 = h1 - fark500
lh618 = h1 - fark618
lh786 = h1 - fark786
# pick the farthest one from last (lower idx)
cond = hbars < lbars
f236 = hl236 if cond else lh236
f382 = hl382 if cond else lh382
f500 = hl500 if cond else lh500
f618 = hl618 if cond else lh618
f786 = hl786 if cond else lh786
return l1, h1, f236, f382, f500, f618, f786