Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve "Websocket error propagation lacks usability" #363

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions src/kraken/futures/websocket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from collections.abc import Callable

from kraken.futures import FuturesWSClient
from kraken.utils.utils import WSState

LOG: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -53,6 +54,7 @@
endpoint: str,
callback: Callable,
) -> None:
self.state = WSState.INIT
self.__client: FuturesWSClient = client
self.__ws_endpoint: str = endpoint
self.__callback: Any = callback
Expand Down Expand Up @@ -86,14 +88,17 @@

async def stop(self: ConnectFuturesWebsocket) -> None:
"""Stops the websocket connection"""
self.state = WSState.CANCELLING
self.keep_alive = False
if hasattr(self, "task") and not self.task.done():
await self.task
self.state = WSState.CLOSED

async def __run( # noqa: C901
self: ConnectFuturesWebsocket,
event: asyncio.Event,
) -> None:
self.state = WSState.CONNECTING
self.__new_challenge = None
self.__last_challenge = None

Expand All @@ -103,7 +108,8 @@
ping_interval=30,
max_queue=None, # FIXME: This is not recommended by the docs https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#module-websockets.asyncio.client
) as socket:
LOG.info("Websocket connected!")
self.state = WSState.CONNECTED
LOG.info("Websocket connection established!")
self.socket = socket

if not event.is_set():
Expand All @@ -121,7 +127,6 @@
except asyncio.CancelledError:
LOG.exception("asyncio.CancelledError")
self.keep_alive = False
await self.__callback({"error": "asyncio.CancelledError"})
else:
try:
message: dict = json.loads(_message)
Expand All @@ -148,19 +153,23 @@
while self.keep_alive:
await self.__reconnect()
except MaxReconnectError:
self.state = WSState.ERROR

Check warning on line 156 in src/kraken/futures/websocket/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/kraken/futures/websocket/__init__.py#L156

Added line #L156 was not covered by tests
await self.__callback(
{"error": "kraken.exceptions.MaxReconnectError"},
{"python-kraken-sdk": {"error": "kraken.exceptions.MaxReconnectError"}},
)
self.exception_occur = True
except Exception:
except Exception: # pylint: disable=broad-except
self.state = WSState.ERROR

Check warning on line 162 in src/kraken/futures/websocket/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/kraken/futures/websocket/__init__.py#L161-L162

Added lines #L161 - L162 were not covered by tests
LOG.exception(traceback.format_exc())
self.exception_occur = True

async def close_connection(self: ConnectFuturesWebsocket) -> None:
"""Closes the connection -/ will force reconnect"""
self.state = WSState.CANCELLING
await self.socket.close()

async def __reconnect(self: ConnectFuturesWebsocket) -> None:
self.state = WSState.RECONNECTING
LOG.info("Websocket start connect/reconnect")

self.__reconnect_num += 1
Expand All @@ -174,9 +183,8 @@
self.__reconnect_num,
)
await asyncio.sleep(reconnect_wait)
LOG.debug("asyncio sleep done")
event: asyncio.Event = asyncio.Event()

event: asyncio.Event = asyncio.Event()
tasks: dict = {
asyncio.ensure_future(
self.__recover_subscription_req_msg(event),
Expand All @@ -192,22 +200,23 @@
exception_occur = False
for task in finished:
if task.exception():
self.state = WSState.ERRORHANDLING
exception_occur = True
self.__challenge_ready = False
traceback.print_stack()
message = f"{task} got an exception {task.exception()}\n {task.get_stack()}"
LOG.warning(message)
for process in pending:
LOG.warning("pending %s", process)
LOG.warning("Pending %s", process)

Check warning on line 209 in src/kraken/futures/websocket/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/kraken/futures/websocket/__init__.py#L209

Added line #L209 was not covered by tests
try:
process.cancel()
LOG.warning("Cancelled %s", process)

Check warning on line 212 in src/kraken/futures/websocket/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/kraken/futures/websocket/__init__.py#L212

Added line #L212 was not covered by tests
except asyncio.CancelledError:
LOG.exception("CancelledError")
LOG.warning("cancel ok")
await self.__callback({"error": message})
LOG.error("Failed to cancel %s", process)

Check warning on line 214 in src/kraken/futures/websocket/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/kraken/futures/websocket/__init__.py#L214

Added line #L214 was not covered by tests
await self.__callback({"python-kraken-sdk": {"error": message}})
if exception_occur:
break
LOG.warning("Connection closed")
self.state = WSState.CLOSED
LOG.info("Connection closed!")

async def __recover_subscription_req_msg(
self: ConnectFuturesWebsocket,
Expand Down
2 changes: 1 addition & 1 deletion src/kraken/spot/funding.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def withdraw_funds(
... key="MyPolkadotWallet"
... amount=4
... )
{ 'refid': 'I7KGS6-UFMTTQ-AGBSO6T'}
{'refid': 'I7KGS6-UFMTTQ-AGBSO6T'}
"""
params: dict = {"asset": asset, "key": str(key), "amount": str(amount)}
if defined(max_fee):
Expand Down
31 changes: 18 additions & 13 deletions src/kraken/spot/websocket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from kraken.spot import SpotAsyncClient
from kraken.spot.websocket.connectors import ConnectSpotWebsocket
from kraken.utils.utils import deprecated
from kraken.utils.utils import WSState, deprecated

if TYPE_CHECKING:
from collections.abc import Callable
Expand Down Expand Up @@ -51,6 +51,8 @@
LOG: logging.Logger = logging.getLogger(__name__)
PROD_ENV_URL: str = "ws.kraken.com"
AUTH_PROD_ENV_URL: str = "ws-auth.kraken.com"
# Changing this can cause errors, as this class is designed for v2.
API_V: str = "/v2"

def __init__( # nosec: B107
self: SpotWSClientBase,
Expand All @@ -61,7 +63,7 @@
no_public: bool = False,
) -> None:
super().__init__(key=key, secret=secret)

self.state: WSState = WSState.INIT
self._is_auth: bool = bool(key and secret)
self.__callback: Callable | None = callback
self._pub_conn: ConnectSpotWebsocket | None = None
Expand All @@ -77,16 +79,12 @@

# --------------------------------------------------------------------------
# Internals
def __prepare_connect(
self: SpotWSClientBase,
*,
no_public: bool,
) -> None:
def __prepare_connect(self: SpotWSClientBase, *, no_public: bool) -> None:
"""Set up functions and attributes based on the API version."""

# pylint: disable=invalid-name
self.PROD_ENV_URL += "/v2"
self.AUTH_PROD_ENV_URL += "/v2"
self.PROD_ENV_URL += self.API_V
self.AUTH_PROD_ENV_URL += self.API_V

self._pub_conn = (
ConnectSpotWebsocket(
Expand All @@ -112,6 +110,7 @@

async def start(self: SpotWSClientBase) -> None:
"""Method to start the websocket connection."""
self.state = WSState.CONNECTING
if self._pub_conn:
await self._pub_conn.start()
if self._priv_conn:
Expand All @@ -126,39 +125,45 @@
else:
public_conntection_waiting = False

private_conection_waiting = True
private_connection_waiting = True
if self._priv_conn:
if self._priv_conn.socket is not None:
private_conection_waiting = False
private_connection_waiting = False
else:
private_conection_waiting = False
private_connection_waiting = False

if not public_conntection_waiting and not private_conection_waiting:
if not public_conntection_waiting and not private_connection_waiting:
break
await async_sleep(0.2)
timeout += 0.2
else:
self.state = WSState.ERROR

Check warning on line 140 in src/kraken/spot/websocket/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/kraken/spot/websocket/__init__.py#L140

Added line #L140 was not covered by tests
raise TimeoutError("Could not connect to the Kraken API!")
self.state = WSState.CONNECTED

async def close(self: SpotWSClientBase) -> None:
"""Method to close the websocket connection."""
self.state = WSState.CANCELLING
if self._pub_conn:
await self._pub_conn.stop()
if self._priv_conn:
await self._priv_conn.stop()
await super().close()
self.state = WSState.CLOSED

@deprecated(
"The 'stop' function is deprecated and will be replaced by"
" 'close' in a future release.",
)
async def stop(self: SpotWSClientBase) -> None:
"""Method to stop the websocket connection."""
self.state = WSState.CANCELLING

Check warning on line 160 in src/kraken/spot/websocket/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/kraken/spot/websocket/__init__.py#L160

Added line #L160 was not covered by tests
if self._pub_conn:
await self._pub_conn.stop()
if self._priv_conn:
await self._priv_conn.stop()
await super().close()
self.state = WSState.CLOSED

Check warning on line 166 in src/kraken/spot/websocket/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/kraken/spot/websocket/__init__.py#L166

Added line #L166 was not covered by tests

async def on_message(
self: SpotWSClientBase,
Expand Down
50 changes: 27 additions & 23 deletions src/kraken/spot/websocket/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from websockets.asyncio.client import connect

from kraken.exceptions import MaxReconnectError
from kraken.utils.utils import WSState

if TYPE_CHECKING:
from collections.abc import Callable
Expand Down Expand Up @@ -68,6 +69,7 @@
*,
is_auth: bool = False,
) -> None:
self.state: WSState = WSState.INIT
self.__client: SpotWSClientBase = client
self.__ws_endpoint: str = endpoint
self.__callback: Callable = callback
Expand Down Expand Up @@ -104,16 +106,19 @@
hasattr(self, "task")
and not self.task.done() # pylint: disable=access-member-before-definition
):
LOG.warning("Websocket connection already running!")

Check warning on line 109 in src/kraken/spot/websocket/connectors.py

View check run for this annotation

Codecov / codecov/patch

src/kraken/spot/websocket/connectors.py#L109

Added line #L109 was not covered by tests
return
self.task: asyncio.Task = asyncio.create_task(
self.__run_forever(),
)

async def stop(self: ConnectSpotWebsocketBase) -> None:
"""Stops the websocket connection"""
self.state = WSState.CANCELLING
self.keep_alive = False
if hasattr(self, "task") and not self.task.done():
await self.task
self.state = WSState.CLOSED

async def __run(self: ConnectSpotWebsocketBase, event: asyncio.Event) -> None:
"""
Expand All @@ -123,6 +128,7 @@
:param event: Event used to control the information flow
:type event: asyncio.Event
"""
self.state = WSState.CONNECTING
self._last_ping = time()
self.ws_conn_details = (
None if not self.__is_auth else await self.__client.get_ws_token()
Expand All @@ -135,9 +141,10 @@
ping_interval=30,
max_queue=None, # FIXME: This is not recommended by the docs https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#module-websockets.asyncio.client
) as socket:
LOG.info("Websocket connected!")
self.socket = socket
self.state = WSState.CONNECTED
LOG.info("Websocket connection established!")

self.socket = socket
if not event.is_set():
await self.send_ping()
event.set()
Expand All @@ -153,7 +160,6 @@
except asyncio.CancelledError:
LOG.exception("asyncio.CancelledError")
self.keep_alive = False
await self.__callback({"error": "asyncio.CancelledError"})
else:
try:
message: dict = json.loads(_message)
Expand All @@ -172,22 +178,19 @@
while self.keep_alive:
await self.__reconnect()
except MaxReconnectError:
self.state = WSState.ERROR

Check warning on line 181 in src/kraken/spot/websocket/connectors.py

View check run for this annotation

Codecov / codecov/patch

src/kraken/spot/websocket/connectors.py#L181

Added line #L181 was not covered by tests
await self.__callback(
{"error": "kraken.exceptions.MaxReconnectError"},
{"python-kraken-sdk": {"error": "kraken.exceptions.MaxReconnectError"}},
)
self.exception_occur = True
except Exception as exc:
traceback_: str = traceback.format_exc()
LOG.exception(
"%s: %s",
exc,
traceback_,
)
await self.__callback({"error": traceback_})
except Exception: # pylint: disable=broad-except
self.state = WSState.ERROR
LOG.exception(traceback.format_exc())

Check warning on line 188 in src/kraken/spot/websocket/connectors.py

View check run for this annotation

Codecov / codecov/patch

src/kraken/spot/websocket/connectors.py#L186-L188

Added lines #L186 - L188 were not covered by tests
self.exception_occur = True

async def close_connection(self: ConnectSpotWebsocketBase) -> None:
"""Closes the websocket connection and thus forces a reconnect"""
self.state = WSState.CANCELLING
await self.socket.close()

async def __reconnect(self: ConnectSpotWebsocketBase) -> None:
Expand All @@ -198,6 +201,7 @@
:raises KrakenException.MaxReconnectError: If there are to many
reconnect retries
"""
self.state = WSState.RECONNECTING
LOG.info("Websocket start connect/reconnect")

self.__reconnect_num += 1
Expand Down Expand Up @@ -225,25 +229,25 @@
tasks,
return_when=asyncio.FIRST_EXCEPTION,
)
exception_occur: bool = False
exception_occur = False
for task in finished:
if task.exception():
self.state = WSState.ERRORHANDLING
exception_occur = True
traceback.print_stack()
message: str = (
f"{task} got an exception {task.exception()}\n {task.get_stack()}"
)
message = f"{task} got an exception {task.exception()}\n {task.get_stack()}"
LOG.warning(message)
for process in pending:
LOG.warning("pending %s", process)
try:
process.cancel()
LOG.warning("Cancelled %s", process)

Check warning on line 243 in src/kraken/spot/websocket/connectors.py

View check run for this annotation

Codecov / codecov/patch

src/kraken/spot/websocket/connectors.py#L243

Added line #L243 was not covered by tests
except asyncio.CancelledError:
LOG.exception("asyncio.CancelledError")
await self.__callback({"error": message})
LOG.error("Failed to cancel %s", process)

Check warning on line 245 in src/kraken/spot/websocket/connectors.py

View check run for this annotation

Codecov / codecov/patch

src/kraken/spot/websocket/connectors.py#L245

Added line #L245 was not covered by tests
await self.__callback({"python-kraken-sdk": {"error": message}})
if exception_occur:
break
LOG.warning("Connection closed")
self.state = WSState.CLOSED
LOG.info("Connection closed!")

def __get_reconnect_wait(
self: ConnectSpotWebsocketBase,
Expand Down Expand Up @@ -353,10 +357,10 @@
it is set to ``True`` - which is when the connection is ready)
:type event: asyncio.Event
"""
log_msg: str = (
f'Recover {"authenticated" if self.is_auth else "public"} subscriptions {self._subscriptions}'
LOG.info(
"%s: waiting",
log_msg := f'Recover {"authenticated" if self.is_auth else "public"} subscriptions {self._subscriptions}',
)
LOG.info("%s: waiting", log_msg)
await event.wait()

for subscription in self._subscriptions:
Expand Down
Loading