-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrsi_calculation_analyze.py
125 lines (101 loc) · 4.44 KB
/
rsi_calculation_analyze.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import logging
from dataclasses import dataclass
from datetime import datetime
import pandas as pd
from tinkoff.invest.utils import quotation_to_decimal
from analyze.base_analyze_provider import IAnalyzeProvider
from data_provider.base_data_provider import IDataProvider
__all__ = ("RsiCalculation")
logger = logging.getLogger(__name__)
@dataclass(eq=False, repr=True)
class FloatCandle:
"""Data class for Pandas calculations.
float type is a mistake and shame. Using it just for example.
Do not use it in production code.
"""
open: float
high: float
low: float
close: float
volume: int
time: datetime
class RsiCalculation(IAnalyzeProvider):
"""An EXAMPLE of self-made indicator: RSI indicator as one of most famous indicator worldwide"""
def __init__(
self,
data_provider: IDataProvider,
# all variables below in passed as *args (str)
length: str,
source: str,
output_file: str
) -> None:
self.__data_provider = data_provider
self.__length = int(length)
self.__source = source
self.__output_file = output_file
def start(self, figi: str, from_days: int) -> None:
logger.info(f"RsiCalculation start; figi:{figi}; from_days:{from_days}; length:{self.__length}")
logger.info(f"Fill candles")
candles: list[FloatCandle] = []
for candle in self.__data_provider.provide_candles(figi, from_days):
candles.append(
FloatCandle(
open=float(quotation_to_decimal(candle.open)),
high=float(quotation_to_decimal(candle.high)),
low=float(quotation_to_decimal(candle.low)),
close=float(quotation_to_decimal(candle.close)),
volume=candle.volume,
time=candle.time
)
)
if not candles:
logger.info(f"Stop RSI calculation. Candles weren't found.")
return
logger.info(f"Start RSI calculation")
candles_rsi = self.__calculate_rsi(pd.DataFrame(candles))
# Save full result to file and log
try:
if self.__output_file:
logger.info(f"Save candles with rsi to file {self.__output_file}")
candles_rsi.to_csv(self.__output_file)
except Exception as ex:
logger.error(f"Error while save candles with rsi {repr(ex)}")
logger.info(f"Result as Data Frame:")
logger.info(candles_rsi)
logger.info(f"RsiCalculation ended")
def __calculate_rsi(
self,
candles: pd.DataFrame
) -> pd.DataFrame:
"""
Wells Wilder's RSI calculation developed via the Pandas library.
Based on: https://github.com/alphazwest
"""
# Calculate Price Differences using the column specified as price (self.__source).
candles['diff'] = candles[self.__source].diff(1)
# Calculate Avg. Gains/Losses
candles['gain'] = candles['diff'].clip(lower=0)
candles['loss'] = candles['diff'].clip(upper=0).abs()
# Get initial Averages
candles['avg_gain'] = candles['gain'].rolling(window=self.__length, min_periods=self.__length).mean()[:self.__length + 1]
candles['avg_loss'] = candles['loss'].rolling(window=self.__length, min_periods=self.__length).mean()[:self.__length + 1]
candles['avg_loss_no_slice'] = candles['loss'].rolling(window=self.__length, min_periods=self.__length).mean()
# Calculate Average Gains
for i, row in enumerate(candles['avg_gain'].iloc[self.__length + 1:]):
candles['avg_gain'].iloc[i + self.__length + 1] = \
(candles['avg_gain'].iloc[i + self.__length] *
(self.__length - 1) +
candles['gain'].iloc[i + self.__length + 1]) \
/ self.__length
# Calculate Average Losses
for i, row in enumerate(candles['avg_loss'].iloc[self.__length + 1:]):
candles['avg_loss'].iloc[i + self.__length + 1] = \
(candles['avg_loss'].iloc[i + self.__length] *
(self.__length - 1) +
candles['loss'].iloc[i + self.__length + 1]) \
/ self.__length
# Calculate RS Values
candles['rs'] = candles['avg_gain'] / candles['avg_loss']
# Calculate RSI
candles['rsi'] = 100 - (100 / (1.0 + candles['rs']))
return candles