From cfbc5e9ad7fafbef9d777c9ab2e6a1a98dbe4091 Mon Sep 17 00:00:00 2001 From: Benjamin Thomas Schwertfeger Date: Sun, 16 Feb 2025 13:03:30 +0100 Subject: [PATCH 1/2] Resolve "Websocket error propagation lacks usability" --- src/kraken/futures/websocket/__init__.py | 9 ++++----- src/kraken/spot/funding.py | 2 +- src/kraken/spot/websocket/connectors.py | 12 +++++------- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/src/kraken/futures/websocket/__init__.py b/src/kraken/futures/websocket/__init__.py index bd2feec..02cc435 100644 --- a/src/kraken/futures/websocket/__init__.py +++ b/src/kraken/futures/websocket/__init__.py @@ -194,20 +194,19 @@ async def __reconnect(self: ConnectFuturesWebsocket) -> None: if task.exception(): exception_occur = True self.__challenge_ready = False - traceback.print_stack() message = f"{task} got an exception {task.exception()}\n {task.get_stack()}" LOG.warning(message) for process in pending: - LOG.warning("pending %s", process) + LOG.warning("Pending %s", process) try: process.cancel() + LOG.warning("Cancelled %s", process) except asyncio.CancelledError: - LOG.exception("CancelledError") - LOG.warning("cancel ok") + LOG.error("Failed to cancel %s", process) await self.__callback({"error": message}) if exception_occur: break - LOG.warning("Connection closed") + LOG.info("Connection closed!") async def __recover_subscription_req_msg( self: ConnectFuturesWebsocket, diff --git a/src/kraken/spot/funding.py b/src/kraken/spot/funding.py index d3620bd..11d41bd 100644 --- a/src/kraken/spot/funding.py +++ b/src/kraken/spot/funding.py @@ -341,7 +341,7 @@ def withdraw_funds( ... key="MyPolkadotWallet" ... amount=4 ... ) - { 'refid': 'I7KGS6-UFMTTQ-AGBSO6T'} + {'refid': 'I7KGS6-UFMTTQ-AGBSO6T'} """ params: dict = {"asset": asset, "key": str(key), "amount": str(amount)} if defined(max_fee): diff --git a/src/kraken/spot/websocket/connectors.py b/src/kraken/spot/websocket/connectors.py index c23534a..76ce72f 100644 --- a/src/kraken/spot/websocket/connectors.py +++ b/src/kraken/spot/websocket/connectors.py @@ -225,25 +225,23 @@ async def __reconnect(self: ConnectSpotWebsocketBase) -> None: tasks, return_when=asyncio.FIRST_EXCEPTION, ) - exception_occur: bool = False + exception_occur = False for task in finished: if task.exception(): exception_occur = True - traceback.print_stack() - message: str = ( - f"{task} got an exception {task.exception()}\n {task.get_stack()}" - ) + message = f"{task} got an exception {task.exception()}\n {task.get_stack()}" LOG.warning(message) for process in pending: LOG.warning("pending %s", process) try: process.cancel() + LOG.warning("Cancelled %s", process) except asyncio.CancelledError: - LOG.exception("asyncio.CancelledError") + LOG.error("Failed to cancel %s", process) await self.__callback({"error": message}) if exception_occur: break - LOG.warning("Connection closed") + LOG.info("Connection closed!") def __get_reconnect_wait( self: ConnectSpotWebsocketBase, From 7536ef734cbf014dae1838817242b5070b034d16 Mon Sep 17 00:00:00 2001 From: Benjamin Thomas Schwertfeger Date: Sun, 16 Feb 2025 13:57:30 +0100 Subject: [PATCH 2/2] Add states to websocket clients --- src/kraken/futures/websocket/__init__.py | 24 ++++++++++----- src/kraken/spot/websocket/__init__.py | 31 +++++++++++-------- src/kraken/spot/websocket/connectors.py | 38 ++++++++++++++---------- src/kraken/utils/utils.py | 14 +++++++++ tests/futures/test_futures_websocket.py | 2 +- tests/spot/test_spot_websocket.py | 2 +- 6 files changed, 73 insertions(+), 38 deletions(-) diff --git a/src/kraken/futures/websocket/__init__.py b/src/kraken/futures/websocket/__init__.py index 02cc435..ea35dd5 100644 --- a/src/kraken/futures/websocket/__init__.py +++ b/src/kraken/futures/websocket/__init__.py @@ -26,6 +26,7 @@ from collections.abc import Callable from kraken.futures import FuturesWSClient +from kraken.utils.utils import WSState LOG: logging.Logger = logging.getLogger(__name__) @@ -53,6 +54,7 @@ def __init__( endpoint: str, callback: Callable, ) -> None: + self.state = WSState.INIT self.__client: FuturesWSClient = client self.__ws_endpoint: str = endpoint self.__callback: Any = callback @@ -86,14 +88,17 @@ async def start(self: ConnectFuturesWebsocket) -> None: async def stop(self: ConnectFuturesWebsocket) -> None: """Stops the websocket connection""" + self.state = WSState.CANCELLING self.keep_alive = False if hasattr(self, "task") and not self.task.done(): await self.task + self.state = WSState.CLOSED async def __run( # noqa: C901 self: ConnectFuturesWebsocket, event: asyncio.Event, ) -> None: + self.state = WSState.CONNECTING self.__new_challenge = None self.__last_challenge = None @@ -103,7 +108,8 @@ async def __run( # noqa: C901 ping_interval=30, max_queue=None, # FIXME: This is not recommended by the docs https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#module-websockets.asyncio.client ) as socket: - LOG.info("Websocket connected!") + self.state = WSState.CONNECTED + LOG.info("Websocket connection established!") self.socket = socket if not event.is_set(): @@ -121,7 +127,6 @@ async def __run( # noqa: C901 except asyncio.CancelledError: LOG.exception("asyncio.CancelledError") self.keep_alive = False - await self.__callback({"error": "asyncio.CancelledError"}) else: try: message: dict = json.loads(_message) @@ -148,19 +153,23 @@ async def __run_forever(self: ConnectFuturesWebsocket) -> None: while self.keep_alive: await self.__reconnect() except MaxReconnectError: + self.state = WSState.ERROR await self.__callback( - {"error": "kraken.exceptions.MaxReconnectError"}, + {"python-kraken-sdk": {"error": "kraken.exceptions.MaxReconnectError"}}, ) self.exception_occur = True - except Exception: + except Exception: # pylint: disable=broad-except + self.state = WSState.ERROR LOG.exception(traceback.format_exc()) self.exception_occur = True async def close_connection(self: ConnectFuturesWebsocket) -> None: """Closes the connection -/ will force reconnect""" + self.state = WSState.CANCELLING await self.socket.close() async def __reconnect(self: ConnectFuturesWebsocket) -> None: + self.state = WSState.RECONNECTING LOG.info("Websocket start connect/reconnect") self.__reconnect_num += 1 @@ -174,9 +183,8 @@ async def __reconnect(self: ConnectFuturesWebsocket) -> None: self.__reconnect_num, ) await asyncio.sleep(reconnect_wait) - LOG.debug("asyncio sleep done") - event: asyncio.Event = asyncio.Event() + event: asyncio.Event = asyncio.Event() tasks: dict = { asyncio.ensure_future( self.__recover_subscription_req_msg(event), @@ -192,6 +200,7 @@ async def __reconnect(self: ConnectFuturesWebsocket) -> None: exception_occur = False for task in finished: if task.exception(): + self.state = WSState.ERRORHANDLING exception_occur = True self.__challenge_ready = False message = f"{task} got an exception {task.exception()}\n {task.get_stack()}" @@ -203,9 +212,10 @@ async def __reconnect(self: ConnectFuturesWebsocket) -> None: LOG.warning("Cancelled %s", process) except asyncio.CancelledError: LOG.error("Failed to cancel %s", process) - await self.__callback({"error": message}) + await self.__callback({"python-kraken-sdk": {"error": message}}) if exception_occur: break + self.state = WSState.CLOSED LOG.info("Connection closed!") async def __recover_subscription_req_msg( diff --git a/src/kraken/spot/websocket/__init__.py b/src/kraken/spot/websocket/__init__.py index b126bc1..8ee1b39 100644 --- a/src/kraken/spot/websocket/__init__.py +++ b/src/kraken/spot/websocket/__init__.py @@ -17,7 +17,7 @@ from kraken.spot import SpotAsyncClient from kraken.spot.websocket.connectors import ConnectSpotWebsocket -from kraken.utils.utils import deprecated +from kraken.utils.utils import WSState, deprecated if TYPE_CHECKING: from collections.abc import Callable @@ -51,6 +51,8 @@ class SpotWSClientBase(SpotAsyncClient): LOG: logging.Logger = logging.getLogger(__name__) PROD_ENV_URL: str = "ws.kraken.com" AUTH_PROD_ENV_URL: str = "ws-auth.kraken.com" + # Changing this can cause errors, as this class is designed for v2. + API_V: str = "/v2" def __init__( # nosec: B107 self: SpotWSClientBase, @@ -61,7 +63,7 @@ def __init__( # nosec: B107 no_public: bool = False, ) -> None: super().__init__(key=key, secret=secret) - + self.state: WSState = WSState.INIT self._is_auth: bool = bool(key and secret) self.__callback: Callable | None = callback self._pub_conn: ConnectSpotWebsocket | None = None @@ -77,16 +79,12 @@ def exception_occur(self: SpotWSClientBase) -> bool: # -------------------------------------------------------------------------- # Internals - def __prepare_connect( - self: SpotWSClientBase, - *, - no_public: bool, - ) -> None: + def __prepare_connect(self: SpotWSClientBase, *, no_public: bool) -> None: """Set up functions and attributes based on the API version.""" # pylint: disable=invalid-name - self.PROD_ENV_URL += "/v2" - self.AUTH_PROD_ENV_URL += "/v2" + self.PROD_ENV_URL += self.API_V + self.AUTH_PROD_ENV_URL += self.API_V self._pub_conn = ( ConnectSpotWebsocket( @@ -112,6 +110,7 @@ def __prepare_connect( async def start(self: SpotWSClientBase) -> None: """Method to start the websocket connection.""" + self.state = WSState.CONNECTING if self._pub_conn: await self._pub_conn.start() if self._priv_conn: @@ -126,27 +125,31 @@ async def start(self: SpotWSClientBase) -> None: else: public_conntection_waiting = False - private_conection_waiting = True + private_connection_waiting = True if self._priv_conn: if self._priv_conn.socket is not None: - private_conection_waiting = False + private_connection_waiting = False else: - private_conection_waiting = False + private_connection_waiting = False - if not public_conntection_waiting and not private_conection_waiting: + if not public_conntection_waiting and not private_connection_waiting: break await async_sleep(0.2) timeout += 0.2 else: + self.state = WSState.ERROR raise TimeoutError("Could not connect to the Kraken API!") + self.state = WSState.CONNECTED async def close(self: SpotWSClientBase) -> None: """Method to close the websocket connection.""" + self.state = WSState.CANCELLING if self._pub_conn: await self._pub_conn.stop() if self._priv_conn: await self._priv_conn.stop() await super().close() + self.state = WSState.CLOSED @deprecated( "The 'stop' function is deprecated and will be replaced by" @@ -154,11 +157,13 @@ async def close(self: SpotWSClientBase) -> None: ) async def stop(self: SpotWSClientBase) -> None: """Method to stop the websocket connection.""" + self.state = WSState.CANCELLING if self._pub_conn: await self._pub_conn.stop() if self._priv_conn: await self._priv_conn.stop() await super().close() + self.state = WSState.CLOSED async def on_message( self: SpotWSClientBase, diff --git a/src/kraken/spot/websocket/connectors.py b/src/kraken/spot/websocket/connectors.py index 76ce72f..6b0ea15 100644 --- a/src/kraken/spot/websocket/connectors.py +++ b/src/kraken/spot/websocket/connectors.py @@ -27,6 +27,7 @@ from websockets.asyncio.client import connect from kraken.exceptions import MaxReconnectError +from kraken.utils.utils import WSState if TYPE_CHECKING: from collections.abc import Callable @@ -68,6 +69,7 @@ def __init__( *, is_auth: bool = False, ) -> None: + self.state: WSState = WSState.INIT self.__client: SpotWSClientBase = client self.__ws_endpoint: str = endpoint self.__callback: Callable = callback @@ -104,6 +106,7 @@ async def start(self: ConnectSpotWebsocketBase) -> None: hasattr(self, "task") and not self.task.done() # pylint: disable=access-member-before-definition ): + LOG.warning("Websocket connection already running!") return self.task: asyncio.Task = asyncio.create_task( self.__run_forever(), @@ -111,9 +114,11 @@ async def start(self: ConnectSpotWebsocketBase) -> None: async def stop(self: ConnectSpotWebsocketBase) -> None: """Stops the websocket connection""" + self.state = WSState.CANCELLING self.keep_alive = False if hasattr(self, "task") and not self.task.done(): await self.task + self.state = WSState.CLOSED async def __run(self: ConnectSpotWebsocketBase, event: asyncio.Event) -> None: """ @@ -123,6 +128,7 @@ async def __run(self: ConnectSpotWebsocketBase, event: asyncio.Event) -> None: :param event: Event used to control the information flow :type event: asyncio.Event """ + self.state = WSState.CONNECTING self._last_ping = time() self.ws_conn_details = ( None if not self.__is_auth else await self.__client.get_ws_token() @@ -135,9 +141,10 @@ async def __run(self: ConnectSpotWebsocketBase, event: asyncio.Event) -> None: ping_interval=30, max_queue=None, # FIXME: This is not recommended by the docs https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#module-websockets.asyncio.client ) as socket: - LOG.info("Websocket connected!") - self.socket = socket + self.state = WSState.CONNECTED + LOG.info("Websocket connection established!") + self.socket = socket if not event.is_set(): await self.send_ping() event.set() @@ -153,7 +160,6 @@ async def __run(self: ConnectSpotWebsocketBase, event: asyncio.Event) -> None: except asyncio.CancelledError: LOG.exception("asyncio.CancelledError") self.keep_alive = False - await self.__callback({"error": "asyncio.CancelledError"}) else: try: message: dict = json.loads(_message) @@ -172,22 +178,19 @@ async def __run_forever(self: ConnectSpotWebsocketBase) -> None: while self.keep_alive: await self.__reconnect() except MaxReconnectError: + self.state = WSState.ERROR await self.__callback( - {"error": "kraken.exceptions.MaxReconnectError"}, + {"python-kraken-sdk": {"error": "kraken.exceptions.MaxReconnectError"}}, ) self.exception_occur = True - except Exception as exc: - traceback_: str = traceback.format_exc() - LOG.exception( - "%s: %s", - exc, - traceback_, - ) - await self.__callback({"error": traceback_}) + except Exception: # pylint: disable=broad-except + self.state = WSState.ERROR + LOG.exception(traceback.format_exc()) self.exception_occur = True async def close_connection(self: ConnectSpotWebsocketBase) -> None: """Closes the websocket connection and thus forces a reconnect""" + self.state = WSState.CANCELLING await self.socket.close() async def __reconnect(self: ConnectSpotWebsocketBase) -> None: @@ -198,6 +201,7 @@ async def __reconnect(self: ConnectSpotWebsocketBase) -> None: :raises KrakenException.MaxReconnectError: If there are to many reconnect retries """ + self.state = WSState.RECONNECTING LOG.info("Websocket start connect/reconnect") self.__reconnect_num += 1 @@ -228,6 +232,7 @@ async def __reconnect(self: ConnectSpotWebsocketBase) -> None: exception_occur = False for task in finished: if task.exception(): + self.state = WSState.ERRORHANDLING exception_occur = True message = f"{task} got an exception {task.exception()}\n {task.get_stack()}" LOG.warning(message) @@ -238,9 +243,10 @@ async def __reconnect(self: ConnectSpotWebsocketBase) -> None: LOG.warning("Cancelled %s", process) except asyncio.CancelledError: LOG.error("Failed to cancel %s", process) - await self.__callback({"error": message}) + await self.__callback({"python-kraken-sdk": {"error": message}}) if exception_occur: break + self.state = WSState.CLOSED LOG.info("Connection closed!") def __get_reconnect_wait( @@ -351,10 +357,10 @@ async def _recover_subscriptions( it is set to ``True`` - which is when the connection is ready) :type event: asyncio.Event """ - log_msg: str = ( - f'Recover {"authenticated" if self.is_auth else "public"} subscriptions {self._subscriptions}' + LOG.info( + "%s: waiting", + log_msg := f'Recover {"authenticated" if self.is_auth else "public"} subscriptions {self._subscriptions}', ) - LOG.info("%s: waiting", log_msg) await event.wait() for subscription in self._subscriptions: diff --git a/src/kraken/utils/utils.py b/src/kraken/utils/utils.py index 9cc18b2..07008e1 100644 --- a/src/kraken/utils/utils.py +++ b/src/kraken/utils/utils.py @@ -10,6 +10,7 @@ from __future__ import annotations import warnings +from enum import Enum, auto from functools import wraps from typing import TYPE_CHECKING, Any @@ -17,6 +18,19 @@ from collections.abc import Callable +class WSState(Enum): + """Enum class representing the state of the WebSocket connection""" + + INIT = auto() # Initial state + CONNECTING = auto() # Connection is being established + CONNECTED = auto() # Connection is established + RECONNECTING = auto() # Connection is being re-established + CANCELLING = auto() # Connection is being cancelled + ERRORHANDLING = auto() # Error is being handled + ERROR = auto() # Error occurred + CLOSED = auto() # Connection is closed + + def deprecated(message: str) -> Callable: """ Function used as decorator to mark decorated functions as deprecated with a diff --git a/tests/futures/test_futures_websocket.py b/tests/futures/test_futures_websocket.py index 0e74c55..a1f99b6 100644 --- a/tests/futures/test_futures_websocket.py +++ b/tests/futures/test_futures_websocket.py @@ -336,7 +336,7 @@ async def check_resubscribe() -> None: asyncio.run(check_resubscribe()) for phrase in ( - "Websocket connected!", + "Websocket connection established!", "got an exception sent 1000 (OK); then received 1000 (OK)", "Connection closed", "Recover subscriptions [{'event': 'subscribe', 'feed': 'open_orders'}]: waiting", diff --git a/tests/spot/test_spot_websocket.py b/tests/spot/test_spot_websocket.py index 0bb3f49..ea89cd2 100644 --- a/tests/spot/test_spot_websocket.py +++ b/tests/spot/test_spot_websocket.py @@ -497,7 +497,7 @@ async def check_reconnect() -> None: "Recover authenticated subscriptions []: waiting", "Recover public subscriptions []: done", "Recover authenticated subscriptions []: done", - "Websocket connected!", + "Websocket connection established!", '{"channel": "status",', '"data": [{', '"api_version": "v2",',