-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathkrakenstore.py
159 lines (130 loc) · 5.54 KB
/
krakenstore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from datetime import datetime
import time as _time
import collections
import threading
import pandas as pd
from backtrader.metabase import MetaParams
from backtrader.utils.py3 import queue, with_metaclass
import backtrader as bt
import krakenex
class MetaSingleton(MetaParams):
'''Metaclass to make a metaclassed class a singleton'''
def __init__(cls, name, bases, dct):
super(MetaSingleton, cls).__init__(name, bases, dct)
cls._singleton = None
def __call__(cls, *args, **kwargs):
if cls._singleton is None:
cls._singleton = (
super(MetaSingleton, cls).__call__(*args, **kwargs))
return cls._singleton
class KrakenStore(with_metaclass(MetaSingleton, object)):
'''Singleton class wrapping to control the connections to Kraken.
Params:
- ``token`` (default:``None``): API access token
- ``account`` (default: ``None``): account id
- ``practice`` (default: ``False``): use the test environment
- ``account_tmout`` (default: ``10.0``): refresh period for account
value/cash refresh
'''
BrokerCls = None # broker class will autoregister
DataCls = None # data class will auto register
params = (
('token', ''),
('account', ''),
('practice', False),
('account_tmout', 10.0), # account balance refresh timeout
)
_DTEPOCH = datetime(1970, 1, 1)
_ENVPRACTICE = 'practice'
_ENVLIVE = 'live'
@classmethod
def getdata(cls, *args, **kwargs):
'''Returns ``DataCls`` with args, kwargs'''
return cls.DataCls(*args, **kwargs)
@classmethod
def getbroker(cls, *args, **kwargs):
'''Returns broker with *args, **kwargs from registered ``BrokerCls``'''
return cls.BrokerCls(*args, **kwargs)
def __init__(self):
super(KrakenStore, self).__init__()
# self.notifs = collections.deque() # store notifications for cerebro
# self._env = None # reference to cerebro for general notifications
# self.broker = None # broker instance
self.datas = list() # datas that have registered over start
# self._orders = collections.OrderedDict() # map order.ref to oid
# self._ordersrev = collections.OrderedDict() # map oid to order.ref
# self._transpend = collections.defaultdict(collections.deque)
# self._oenv = self._ENVPRACTICE if self.p.practice else self._ENVLIVE
self.kex = krakenex.API()
# self._cash = 0.0
# self._value = 0.0
# self._evt_acct = threading.Event()
# def start(self, data=None, broker=None):
# # Datas require some processing to kickstart data reception
# if data is None and broker is None:
# self.cash = None
# return
#
# if data is not None:
# self._env = data._env
# # For datas simulate a queue with None to kickstart co
# self.datas.append(data)
#
# if self.broker is not None:
# self.broker.data_started(data)
#
# elif broker is not None:
# self.broker = broker
# self.streaming_events( )
# self.broker_threads( )
# def stop(self):
# # signal end of thread
# if self.broker is not None:
# self.q_ordercreate.put(None)
# self.q_orderclose.put(None)
# self.q_account.put(None)
# def put_notification(self, msg, *args, **kwargs):
# self.notifs.append((msg, args, kwargs))
#
# def get_notifications(self):
# '''Return the pending "store" notifications'''
# self.notifs.append(None) # put a mark / threads could still append
# return [x for x in iter(self.notifs.popleft, None)]
# Kraken supported granularities
_GRANULARITIES = {
(bt.TimeFrame.Minutes, 1): 1,
(bt.TimeFrame.Minutes, 5): 5,
(bt.TimeFrame.Minutes, 15): 15,
(bt.TimeFrame.Minutes, 30): 30,
(bt.TimeFrame.Minutes, 60): 60,
(bt.TimeFrame.Minutes, 240): 240,
(bt.TimeFrame.Days, 1): 1440,
(bt.TimeFrame.Weeks, 1): 10080,
(bt.TimeFrame.Days, 15): 21600, # weird one - 15 day candle
}
# def get_positions(self):
# # TODO: Query the current positions from Kraken private API
# poslist = None
# return poslist
def get_granularity(self, timeframe, compression):
return self._GRANULARITIES.get((timeframe, compression), None)
def get_instrument(self, dataname):
# TODO: Query dataname as an instrument (ticker) and... return info for it I guess? unclear
return None
def get_source_time(self):
ret = self.kex.query_public('Time')
return datetime.fromtimestamp(ret['result']['unixtime'])
def get_instrument(self, dataname):
ret = self.kex.query_public('AssetPairs', req={'pair': dataname})
pair_ret = ret['result']
return pair_ret.get(dataname, None)
def get_ohlc(self, dataname, since, granularity):
ret = self.kex.query_public('OHLC', req={
'pair': dataname, 'since': since, 'interval': granularity})
self.since = datetime.now()
ohlc_columns = ['datetime', 'open', 'high', 'low', 'close', 'vwap', 'volume', 'count']
ohlc_table = pd.DataFrame(data=ret['result'][dataname], columns=ohlc_columns)
ohlc_table = ohlc_table.apply(lambda ax: pd.to_numeric(ax, errors='ignore'))
ohlc_table['datetime'] = ohlc_table['datetime'].apply(datetime.fromtimestamp)
ohlc_table = ohlc_table.set_index('datetime')
return ohlc_table