-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsubscriber.py
111 lines (84 loc) · 3.15 KB
/
subscriber.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from ticton import TicTonAsyncClient
from ticton.callbacks import (
OnTickSuccessParams,
OnRingSuccessParams,
OnWindSuccessParams,
)
import asyncio
import os
from dotenv import load_dotenv
import logging
import redis
from mariadb_connector import Alarm, update_alarm_to_db
from tonsdk.utils import Address
from pytoncenter.address import Address as PyAddress
load_dotenv()
# set up logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB = int(os.getenv("REDIS_DB", 0))
THRESHOLD_PRICE = os.getenv("THRESHOLD_PRICE", 0.7)
redis_client = redis.StrictRedis(
host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True
)
MY_ADDRESS = PyAddress(os.getenv("MY_ADDRESS", "")).to_string(False)
async def on_tick_success(on_tick_success_params: OnTickSuccessParams):
logger.info(f"Tick received: {on_tick_success_params}")
price = round(float(on_tick_success_params.base_asset_price), 9)
watchmaker = PyAddress(on_tick_success_params.watchmaker).to_string(False)
is_mine = watchmaker == MY_ADDRESS
lt = on_tick_success_params.tx.lt
redis_client.set("last_lt", lt)
alarm = Alarm(
id=on_tick_success_params.new_alarm_id,
price=price,
is_mine=is_mine,
created_at=on_tick_success_params.created_at,
)
await update_alarm_to_db([alarm])
async def on_ring_success(on_ring_success_params: OnRingSuccessParams):
logger.info(f"Ring received: {on_ring_success_params}")
lt = on_ring_success_params.tx.lt
redis_client.set("last_lt", lt)
alarm = Alarm(id=on_ring_success_params.alarm_id, state="uninitialized")
await update_alarm_to_db([alarm])
async def on_wind_success(on_wind_success_params: OnWindSuccessParams):
logger.info(f"Wind received: {on_wind_success_params}")
price = round(float(on_wind_success_params.new_base_asset_price), 9)
timekeeper = PyAddress(on_wind_success_params.timekeeper).to_string(False)
is_mine = timekeeper == MY_ADDRESS
lt = on_wind_success_params.tx.lt
redis_client.set("last_lt", lt)
alarm = Alarm(
id=on_wind_success_params.alarm_id,
remain_scale=on_wind_success_params.remain_scale,
)
new_alarm = Alarm(
id=on_wind_success_params.new_alarm_id,
price=price,
is_mine=is_mine,
created_at=on_wind_success_params.created_at,
)
await update_alarm_to_db([alarm, new_alarm])
async def subscribe():
client = await TicTonAsyncClient.init(testnet=True)
lt = redis_client.get("last_lt")
if isinstance(lt, str):
lt = int(lt)
else:
lt = "latest" # or oldest
await client.subscribe(
on_tick_success=on_tick_success,
on_wind_success=on_wind_success,
on_ring_success=on_ring_success,
start_lt=lt,
)
if __name__ == "__main__":
asyncio.run(subscribe())