forked from zzzac/Rule-based-forex-trading-system
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathta.py
166 lines (127 loc) · 5.25 KB
/
ta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import pandas as pd
import numpy as np
def count_trade_no(signal):
no_trades = 0
prev = 10
for i in signal:
if i != prev and prev != 0:
prev = i
no_trades += 1
return no_trades
def ema(series, n):
return series.ewm(span=n, min_periods=n).mean()
def ma(close, n):
return close.rolling(n).mean()
def DEMA(close, n):
EMA = ema(close, n)
return 2*EMA - ema(EMA,n)
def TEMA(close, n):
EMA = ema(close, n)
EEMA = ema(EMA, n)
return 3*EMA - 3*EEMA + ema(EEMA, n)
def rsi(close, n=14):
diff = close.diff(1)
which_dn = diff < 0
up, dn = diff, diff*0
up[which_dn], dn[which_dn] = 0, -up[which_dn]
emaup = ema(up, n)
emadn = ema(dn, n)
rsi = 100 * emaup / (emaup + emadn)
return pd.Series(rsi, name='rsi')
def stoch(high, low, close, n=14, fillna=False):
smin = low.rolling(n, min_periods=0).min()
smax = high.rolling(n, min_periods=0).max()
stoch_k = 100 * (close - smin) / (smax - smin)
return pd.Series(stoch_k, name='stoch_k')
def stoch_signal(high, low, close, n=14, d_n=3, fillna=False):
stoch_k = stoch(high, low, close, n, fillna=fillna)
stoch_d = stoch_k.rolling(d_n, min_periods=0).mean()
if fillna:
stoch_d = stoch_d.replace([np.inf, -np.inf], np.nan).fillna(50)
return pd.Series(stoch_d, name='stoch_d')
def average_true_range(high, low, close, n=14, fillna=False):
cs = close.shift(1)
tr = high.combine(cs, max) - low.combine(cs, min)
atr = np.zeros(len(close))
atr[0] = tr[1::].mean()
for i in range(1, len(atr)):
atr[i] = (atr[i-1] * (n-1) + tr.iloc[i]) / float(n)
atr = pd.Series(data=atr, index=tr.index)
return pd.Series(atr, name='atr')
def vortex_indicator_pos(high, low, close, n=14, fillna=False):
tr = (high.combine(close.shift(1), max)
- low.combine(close.shift(1), min))
trn = tr.rolling(n).sum()
vmp = np.abs(high - low.shift(1))
vmm = np.abs(low - high.shift(1))
vip = vmp.rolling(n, min_periods=0).sum() / trn
if fillna:
vip = vip.replace([np.inf, -np.inf], np.nan).fillna(1)
return pd.Series(vip, name='vip')
def vortex_indicator_neg(high, low, close, n=14, fillna=False):
tr = high.combine(close.shift(1), max) - low.combine(close.shift(1), min)
trn = tr.rolling(n).sum()
vmp = np.abs(high - low.shift(1))
vmm = np.abs(low - high.shift(1))
vin = vmm.rolling(n).sum() / trn
if fillna:
vin = vin.replace([np.inf, -np.inf], np.nan).fillna(1)
return pd.Series(vin, name='vin')
def cci(high, low, close, n=20, c=0.015):
pp = (high + low + close) / 3.0
cci = (pp - pp.rolling(n, min_periods=0).mean()) / (c * pp.rolling(n, min_periods=0).std())
return pd.Series(cci, name='cci')
def bollinger_mavg(close, n=20, fillna=False):
mavg = close.rolling(n, min_periods=0).mean()
if fillna:
mavg = mavg.replace(
[np.inf, -np.inf], np.nan).fillna(method='backfill')
return pd.Series(mavg, name='mavg')
def bollinger_hband(close, n=20, ndev=2, fillna=False):
mavg = close.rolling(n, min_periods=0).mean()
mstd = close.rolling(n, min_periods=0).std()
hband = mavg + ndev*mstd
return pd.Series(hband, name='hband')
def bollinger_lband(close, n=20, ndev=2, fillna=False):
mavg = close.rolling(n, min_periods=0).mean()
mstd = close.rolling(n, min_periods=0).std()
lband = mavg - ndev * mstd
return pd.Series(lband, name='lband')
def keltner_channel_hband(high, low, close, n=10, fillna=False):
tp = ((4 * high) - (2 * low) + close) / 3.0
tp = tp.rolling(n, min_periods=0).mean()
if fillna:
tp = tp.replace([np.inf, -np.inf], np.nan).fillna(method='backfill')
return pd.Series(tp, name='kc_hband')
def keltner_channel_lband(high, low, close, n=10):
tp = ((-2 * high) + (4 * low) + close) / 3.0
tp = tp.rolling(n, min_periods=0).mean()
return pd.Series(tp, name='kc_lband')
def donchian_channel_hband(close, n=20, fillna=False):
hband = close.rolling(n, min_periods=0).max()
if fillna:
hband = hband.replace(
[np.inf, -np.inf], np.nan).fillna(method='backfill')
return pd.Series(hband, name='dchband')
def donchian_channel_lband(close, n=20, fillna=False):
lband = close.rolling(n, min_periods=0).min()
if fillna:
lband = lband.replace(
[np.inf, -np.inf], np.nan).fillna(method='backfill')
return pd.Series(lband, name='dclband')
def ichimoku_a(high, low, n1=9, n2=26, visual=False, fillna=False):
conv = 0.5 * (high.rolling(n1, min_periods=0).max() + low.rolling(n1, min_periods=0).min())
base = 0.5 * (high.rolling(n2, min_periods=0).max() + low.rolling(n2, min_periods=0).min())
spana = 0.5 * (conv + base)
if visual:
spana = spana.shift(n2)
if fillna:
spana = spana.replace([np.inf, -np.inf], np.nan).fillna(method='backfill')
return pd.Series(spana, name='ichimoku_a_'+str(n2))
def ichimoku_b(high, low, n2=26, n3=52, visual=False, fillna=False):
spanb = 0.5 * (high.rolling(n3, min_periods=0).max() + low.rolling(n3, min_periods=0).min())
if visual:
spanb = spanb.shift(n2)
if fillna:
spanb = spanb.replace([np.inf, -np.inf], np.nan).fillna(method='backfill')
return pd.Series(spanb, name='ichimoku_b_'+str(n2))