Skip to content

Commit

Permalink
Resolve "Long lasting REST sessions get terminated" (#334)
Browse files Browse the repository at this point in the history
  • Loading branch information
btschwertfeger authored Jan 5, 2025
1 parent 2ef96f8 commit 2b97c6b
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 61 deletions.
4 changes: 0 additions & 4 deletions .github/dependabot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@ updates:
directory: "/"
schedule:
interval: "weekly"
reviewers:
- "btschwertfeger"
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"
reviewers:
- "btschwertfeger"
ignore:
- dependency-name: "ruff"
8 changes: 5 additions & 3 deletions .github/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ changelog:
- title: Other Changes
labels:
- "*"
# exclude:
# labels:
# - dependencies
exclude:
labels:
- dependencies
- github_actions
- title: 👒 Dependencies
labels:
- dependencies
- github_actions
144 changes: 107 additions & 37 deletions kraken/base_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import hmac
import json
import time
from copy import deepcopy
from functools import wraps
from typing import TYPE_CHECKING, Any, TypeVar
from urllib.parse import urlencode, urljoin
Expand Down Expand Up @@ -175,12 +174,16 @@ def check_batch_status(self: ErrorHandler, data: dict) -> dict:

class SpotClient:
"""
This class is the base for all Spot clients, handles un-/signed
requests and returns exception handled results.
This class is the base for all Spot clients, handles un-/signed requests and
returns exception handled results.
If you are facing timeout errors on derived clients, you can make use of the
``TIMEOUT`` attribute to deviate from the default ``10`` seconds.
Kraken sometimes rejects requests that are older than a certain time without
further information. To avoid this, the session manager creates a new
session every 5 minutes.
:param key: Spot API public key (default: ``""``)
:type key: str, optional
:param secret: Spot API secret key (default: ``""``)
Expand All @@ -193,6 +196,7 @@ class SpotClient:

URL: str = "https://api.kraken.com"
TIMEOUT: int = 10
MAX_SESSION_AGE: int = 300 # seconds
HEADERS: Final[dict] = {"User-Agent": "btschwertfeger/python-kraken-sdk"}

def __init__( # nosec: B107
Expand All @@ -211,15 +215,29 @@ def __init__( # nosec: B107
self._secret: str = secret
self._use_custom_exceptions: bool = use_custom_exceptions
self._err_handler: ErrorHandler = ErrorHandler()
self.__session: requests.Session = requests.Session()
if proxy is not None:
self.__proxy: str | None = proxy
self.__session_start_time: float
self.__session: requests.Session
self.__create_new_session()

def __create_new_session(self: SpotClient) -> None:
"""Create a new session."""
self.__session = requests.Session()
self.__session.headers.update(self.HEADERS)
if self.__proxy is not None:
self.__session.proxies.update(
{
"http": proxy,
"https": proxy,
"http": self.__proxy,
"https": self.__proxy,
},
)
self.__session.headers.update(self.HEADERS)
self.__session_start_time = time.time()

def __check_renew_session(self: SpotClient) -> None:
"""Check if the session is too old and renew if necessary."""
if time.time() - self.__session_start_time > self.MAX_SESSION_AGE:
self.__session.close() # Close the old session
self.__create_new_session()

def _prepare_request(
self: SpotClient,
Expand Down Expand Up @@ -254,7 +272,7 @@ def _prepare_request(
elif query_str:
query_params = query_str

headers: dict = deepcopy(self.HEADERS)
headers: dict = {}

if auth:
if not self._key or not self._secret:
Expand Down Expand Up @@ -340,7 +358,9 @@ def request( # noqa: PLR0913 # pylint: disable=too-many-arguments
query_str=query_str,
extra_params=extra_params,
)

timeout: int = self.TIMEOUT if timeout != 10 else timeout # type: ignore[no-redef]
self.__check_renew_session()

if method in {"GET", "DELETE"}:
return self.__check_response_data(
Expand Down Expand Up @@ -470,6 +490,10 @@ class SpotAsyncClient(SpotClient):
If you are facing timeout errors on derived clients, you can make use of the
``TIMEOUT`` attribute to deviate from the default ``10`` seconds.
Kraken sometimes rejects requests that are older than a certain time without
further information. To avoid this, the session manager creates a new
session every 5 minutes.
:param key: Spot API public key (default: ``""``)
:type key: str, optional
:param secret: Spot API secret key (default: ``""``)
Expand All @@ -495,8 +519,21 @@ def __init__( # nosec: B107
url=url,
use_custom_exceptions=use_custom_exceptions,
)
self.__session = aiohttp.ClientSession(headers=self.HEADERS)
self.proxy = proxy
self.__proxy: str | None = proxy
self.__session_start_time: float
self.__session: aiohttp.ClientSession
self.__create_new_session()

def __create_new_session(self: SpotAsyncClient) -> None:
"""Create a new session."""
self.__session = aiohttp.ClientSession(headers=self.HEADERS, proxy=self.__proxy)
self.__session_start_time = time.time()

async def __check_renew_session(self: SpotAsyncClient) -> None:
"""Check if the session is too old and renew if necessary."""
if time.time() - self.__session_start_time > self.MAX_SESSION_AGE:
await self.__session.close() # Close the old session
self.__create_new_session()

async def request( # type: ignore[override] # pylint: disable=invalid-overridden-method,too-many-arguments # noqa: PLR0913
self: SpotAsyncClient,
Expand Down Expand Up @@ -552,40 +589,38 @@ async def request( # type: ignore[override] # pylint: disable=invalid-overridde
extra_params=extra_params,
)
timeout: int = self.TIMEOUT if timeout != 10 else timeout # type: ignore[no-redef]
await self.__check_renew_session()

if method in {"GET", "DELETE"}:
return await self.__check_response_data( # type: ignore[return-value]
response=await self.__session.request( # type: ignore[misc,call-arg]
response=await self.__session.request(
method=method,
url=f"{url}?{query_params}" if query_params else url,
headers=headers,
timeout=timeout,
proxy=self.proxy,
),
return_raw=return_raw,
)

if do_json:
return await self.__check_response_data( # type: ignore[return-value]
response=await self.__session.request( # type: ignore[misc,call-arg]
response=await self.__session.request(
method=method,
url=url,
headers=headers,
json=params,
timeout=timeout,
proxy=self.proxy,
),
return_raw=return_raw,
)

return await self.__check_response_data( # type: ignore[return-value]
response=await self.__session.request( # type: ignore[misc,call-arg]
response=await self.__session.request(
method=method,
url=url,
headers=headers,
data=params,
timeout=timeout,
proxy=self.proxy,
),
return_raw=return_raw,
)
Expand Down Expand Up @@ -628,7 +663,7 @@ async def __check_response_data( # pylint: disable=invalid-overridden-method

async def async_close(self: SpotAsyncClient) -> None:
"""Closes the aiohttp session"""
await self.__session.close() # type: ignore[func-returns-value]
await self.__session.close()

async def __aenter__(self: Self) -> Self:
return self
Expand All @@ -643,22 +678,28 @@ class NFTClient(SpotClient):

class FuturesClient:
"""
The base class for all Futures clients handles un-/signed requests
and returns exception handled results.
The base class for all Futures clients handles un-/signed requests and
returns exception handled results.
If you are facing timeout errors on derived clients, you can make use of the
``TIMEOUT`` attribute to deviate from the default ``10`` seconds.
If the sandbox environment is chosen, the keys must be generated from here:
https://demo-futures.kraken.com/settings/api
Kraken sometimes rejects requests that are older than a certain time without
further information. To avoid this, the session manager creates a new
session every 5 minutes.
:param key: Futures API public key (default: ``""``)
:type key: str, optional
:param secret: Futures API secret key (default: ``""``)
:type secret: str, optional
:param url: The URL to access the Futures Kraken API (default: https://futures.kraken.com)
:param url: The URL to access the Futures Kraken API (default:
https://futures.kraken.com)
:type url: str, optional
:param sandbox: If set to ``True`` the URL will be https://demo-futures.kraken.com (default: ``False``)
:param sandbox: If set to ``True`` the URL will be
https://demo-futures.kraken.com (default: ``False``)
:type sandbox: bool, optional
:param proxy: proxy URL, may contain authentication information
:type proxy: str, optional
Expand All @@ -668,6 +709,7 @@ class FuturesClient:
SANDBOX_URL: str = "https://demo-futures.kraken.com"
TIMEOUT: int = 10
HEADERS: Final[dict] = {"User-Agent": "btschwertfeger/python-kraken-sdk"}
MAX_SESSION_AGE: int = 300 # seconds

def __init__( # nosec: B107
self: FuturesClient,
Expand All @@ -693,15 +735,30 @@ def __init__( # nosec: B107
self._use_custom_exceptions: bool = use_custom_exceptions

self._err_handler: ErrorHandler = ErrorHandler()
self.__session: requests.Session = requests.Session()

self.__proxy: str | None = proxy
self.__session_start_time: float
self.__session: requests.Session
self.__create_new_session()

def __create_new_session(self: FuturesClient) -> None:
"""Create a new session."""
self.__session = requests.Session()
self.__session.headers.update(self.HEADERS)
if proxy is not None:
if self.__proxy is not None:
self.__session.proxies.update(
{
"http": proxy,
"https": proxy,
"http": self.__proxy,
"https": self.__proxy,
},
)
self.__session_start_time = time.time()

def __check_renew_session(self: FuturesClient) -> None:
"""Check if the session is too old and renew if necessary."""
if time.time() - self.__session_start_time > self.MAX_SESSION_AGE:
self.__session.close() # Close the old session
self.__create_new_session()

def _prepare_request(
self: FuturesClient,
Expand Down Expand Up @@ -734,7 +791,7 @@ def _prepare_request(
"" if query_params is None else urlencode(query_params, doseq=True) # type: ignore[arg-type]
)

headers: dict = deepcopy(self.HEADERS)
headers: dict = {}

if auth:
if not self._key or not self._secret:
Expand Down Expand Up @@ -807,6 +864,7 @@ def request( # pylint: disable=too-many-arguments
extra_params=extra_params,
)
timeout: int = self.TIMEOUT if timeout == 10 else timeout # type: ignore[no-redef]
self.__check_renew_session()

if method in {"GET", "DELETE"}:
return self.__check_response_data(
Expand Down Expand Up @@ -969,8 +1027,21 @@ def __init__( # nosec: B107
sandbox=sandbox,
use_custom_exceptions=use_custom_exceptions,
)
self.__session = aiohttp.ClientSession(headers=self.HEADERS)
self.proxy = proxy
self.__proxy: str | None = proxy
self.__session_start_time: float
self.__session: aiohttp.ClientSession
self.__create_new_session()

def __create_new_session(self: FuturesAsyncClient) -> None:
"""Create a new session."""
self.__session = aiohttp.ClientSession(headers=self.HEADERS, proxy=self.__proxy)
self.__session_start_time = time.time()

async def __check_renew_session(self: FuturesAsyncClient) -> None:
"""Check if the session is too old and renew if necessary."""
if time.time() - self.__session_start_time > self.MAX_SESSION_AGE:
await self.__session.close() # Close the old session
self.__create_new_session()

async def request( # type: ignore[override] # pylint: disable=arguments-differ,invalid-overridden-method
self: FuturesAsyncClient,
Expand All @@ -990,42 +1061,41 @@ async def request( # type: ignore[override] # pylint: disable=arguments-differ,
query_params=query_params,
auth=auth,
)
timeout: int = self.TIMEOUT if timeout != 10 else timeout # type: ignore[no-redef]

timeout = self.TIMEOUT if timeout != 10 else timeout
await self.__check_renew_session()

if method in {"GET", "DELETE"}:
return await self.__check_response_data(
response=await self.__session.request( # type: ignore[misc,call-arg]
response=await self.__session.request(
method=method,
url=url,
params=query_string,
headers=headers,
timeout=timeout,
proxy=self.proxy,
),
return_raw=return_raw,
)

if method == "PUT":
return await self.__check_response_data(
response=await self.__session.request( # type: ignore[misc,call-arg]
response=await self.__session.request(
method=method,
url=url,
params=encoded_payload,
headers=headers,
timeout=timeout,
proxy=self.proxy,
),
return_raw=return_raw,
)

return await self.__check_response_data(
response=await self.__session.request( # type: ignore[misc,call-arg]
response=await self.__session.request(
method=method,
url=url,
data=encoded_payload,
headers=headers,
timeout=timeout,
proxy=self.proxy,
),
return_raw=return_raw,
)
Expand Down Expand Up @@ -1074,7 +1144,7 @@ async def __check_response_data( # pylint: disable=invalid-overridden-method

async def async_close(self: FuturesAsyncClient) -> None:
"""Closes the aiohttp session"""
await self.__session.close() # type: ignore[func-returns-value]
await self.__session.close()

async def __aenter__(self: Self) -> Self:
return self
Expand Down
Loading

0 comments on commit 2b97c6b

Please sign in to comment.