-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathfinance.py
123 lines (108 loc) · 4.08 KB
/
finance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#
# Finance Environment
#
# (c) Dr. Yves J. Hilpisch
# Artificial Intelligence in Finance
#
import math
import random
import numpy as np
import pandas as pd
class observation_space:
def __init__(self, n):
self.shape = (n,)
class action_space:
def __init__(self, n):
self.n = n
def sample(self):
return random.randint(0, self.n - 1)
class Finance:
intraday = False
if intraday:
url = 'http://hilpisch.com/aiif_eikon_id_eur_usd.csv'
else:
url = 'http://hilpisch.com/aiif_eikon_eod_data.csv'
def __init__(self, symbol, features, window, lags,
leverage=1, min_performance=0.85, min_accuracy=0.5,
start=0, end=None, mu=None, std=None):
self.symbol = symbol
self.features = features
self.n_features = len(features)
self.window = window
self.lags = lags
self.leverage = leverage
self.min_performance = min_performance
self.min_accuracy = min_accuracy
self.start = start
self.end = end
self.mu = mu
self.std = std
self.observation_space = observation_space(self.lags)
self.action_space = action_space(2)
self._get_data()
self._prepare_data()
def _get_data(self):
self.raw = pd.read_csv(self.url, index_col=0,
parse_dates=True).dropna()
if self.intraday:
self.raw = self.raw.resample('30min', label='right').last()
self.raw = pd.DataFrame(self.raw['CLOSE'])
self.raw.columns = [self.symbol]
def _prepare_data(self):
self.data = pd.DataFrame(self.raw[self.symbol])
self.data = self.data.iloc[self.start:]
self.data['r'] = np.log(self.data / self.data.shift(1))
self.data.dropna(inplace=True)
self.data['s'] = self.data[self.symbol].rolling(self.window).mean()
self.data['m'] = self.data['r'].rolling(self.window).mean()
self.data['v'] = self.data['r'].rolling(self.window).std()
self.data.dropna(inplace=True)
if self.mu is None:
self.mu = self.data.mean()
self.std = self.data.std()
self.data_ = (self.data - self.mu) / self.std
self.data['d'] = np.where(self.data['r'] > 0, 1, 0)
self.data['d'] = self.data['d'].astype(int)
if self.end is not None:
self.data = self.data.iloc[:self.end - self.start]
self.data_ = self.data_.iloc[:self.end - self.start]
def _get_state(self):
return self.data_[self.features].iloc[self.bar -
self.lags:self.bar]
def get_state(self, bar):
return self.data_[self.features].iloc[bar - self.lags:bar]
def seed(self, seed):
random.seed(seed)
np.random.seed(seed)
def reset(self):
self.treward = 0
self.accuracy = 0
self.performance = 1
self.bar = self.lags
state = self.data_[self.features].iloc[self.bar -
self.lags:self.bar]
return state.values
def step(self, action):
correct = action == self.data['d'].iloc[self.bar]
ret = self.data['r'].iloc[self.bar] * self.leverage
reward_1 = 1 if correct else 0
reward_2 = abs(ret) if correct else -abs(ret)
self.treward += reward_1
self.bar += 1
self.accuracy = self.treward / (self.bar - self.lags)
self.performance *= math.exp(reward_2)
if self.bar >= len(self.data):
done = True
elif reward_1 == 1:
done = False
elif (self.performance < self.min_performance and
self.bar > self.lags + 15):
done = True
elif (self.accuracy < self.min_accuracy and
self.bar > self.lags + 15):
done = True
else:
done = False
state = self._get_state()
info = {}
return state.values, reward_1 + reward_2 * 5, done, info