Skip to content

Commit

Permalink
fix the futures ws reconnect issue + add test that checks for that
Browse files Browse the repository at this point in the history
  • Loading branch information
btschwertfeger committed Dec 18, 2023
1 parent 2e430bb commit d5f6373
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 25 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Copyright (C) 2023 Benjamin Thomas Schwertfeger
# GitHub: https://github.com/btschwertfeger

PYTHON := python
PYTHON := venv/bin/python
PYTEST := $(PYTHON) -m pytest
PYTEST_OPTS := -vv --junit-xml=pytest.xml
PYTEST_COV_OPTS := $(PYTEST_OPTS) --cov --cov-report=xml:coverage.xml --cov-report=term
Expand Down Expand Up @@ -53,10 +53,10 @@ test:
.PHONY: tests
tests: test

## test-wip Run tests marked as 'wip'
## wip Run tests marked as 'wip'
##
.PHONY: test-wip
test-wip:
.PHONY: wip
wip:
@rm *.log || true
$(PYTEST) -m "wip" -vv $(TEST_DIR)

Expand Down
13 changes: 5 additions & 8 deletions examples/futures_trading_bot_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import os
import sys
import traceback
from typing import Optional, Union
from typing import Optional

import requests
import urllib3

from kraken.exceptions import KrakenException
from kraken.exceptions import KrakenAuthenticationError
from kraken.futures import Funding, KrakenFuturesWSClient, Market, Trade, User

logging.basicConfig(
Expand Down Expand Up @@ -64,7 +64,7 @@ def __init__(self: TradingBot, config: dict) -> None:
self.__market: Market = Market(key=config["key"], secret=config["secret"])
self.__funding: Funding = Funding(key=config["key"], secret=config["secret"])

async def on_message(self: TradingBot, message: Union[list, dict]) -> None:
async def on_message(self: TradingBot, message: list | dict) -> None:
"""Receives all messages that came form the websocket feed(s)"""
logging.info(message)

Expand All @@ -90,10 +90,7 @@ async def on_message(self: TradingBot, message: Union[list, dict]) -> None:

def save_exit(self: TradingBot, reason: Optional[str] = "") -> None:
"""Controlled shutdown of the strategy"""
logging.warning(
"Save exit triggered, reason: {reason}",
extra={"reason": reason},
)
logging.warning("Save exit triggered, reason: %s", reason)
# some ideas:
# * save the bots data
# * maybe close trades
Expand Down Expand Up @@ -188,7 +185,7 @@ def __check_credentials(self: ManagedBot) -> bool:
except requests.exceptions.ConnectionError:
logging.error("ConnectionError, Kraken not available.")
return False
except KrakenException.KrakenAuthenticationError:
except KrakenAuthenticationError:
logging.error("Invalid credentials!")
return False

Expand Down
15 changes: 10 additions & 5 deletions kraken/futures/websocket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ConnectFuturesWebsocket:
:type callback: function
"""

MAX_RECONNECT_NUM: int = 2
MAX_RECONNECT_NUM: int = 3

def __init__(
self: ConnectFuturesWebsocket,
Expand Down Expand Up @@ -130,14 +130,18 @@ async def __run_forever(self: ConnectFuturesWebsocket) -> None:
finally:
self.__client.exception_occur = True

async def close_connection(self: ConnectFuturesWebsocket) -> None:
"""Closes the connection -/ will force reconnect"""
await self.__socket.close()

async def __reconnect(self: ConnectFuturesWebsocket) -> None:
logging.info("Websocket start connect/reconnect")

self.__reconnect_num += 1
if self.__reconnect_num >= self.MAX_RECONNECT_NUM:
raise MaxReconnectError

reconnect_wait: float = self.__get_reconnect_wait(self.__reconnect_num)
reconnect_wait: float = self.__get_reconnect_wait(attempts=self.__reconnect_num)
logging.debug(
"asyncio sleep reconnect_wait=%f s reconnect_num=%d",
reconnect_wait,
Expand All @@ -163,6 +167,7 @@ async def __reconnect(self: ConnectFuturesWebsocket) -> None:
for task in finished:
if task.exception():
exception_occur = True
self.__challenge_ready = False
traceback.print_stack()
message = f"{task} got an exception {task.exception()}\n {task.get_stack()}"
logging.warning(message)
Expand All @@ -176,14 +181,14 @@ async def __reconnect(self: ConnectFuturesWebsocket) -> None:
await self.__callback({"error": message})
if exception_occur:
break
logging.warning("reconnect over")
logging.warning("Connection closed")

async def __recover_subscription_req_msg(
self: ConnectFuturesWebsocket,
event: asyncio.Event,
) -> None:
logging.info(
"Recover subscriptions %s waiting.",
"Recover subscriptions %s: waiting",
self.__subscriptions,
)
await event.wait()
Expand All @@ -196,7 +201,7 @@ async def __recover_subscription_req_msg(
logging.info("%s: OK", sub)

logging.info(
"Recover subscriptions %s done.",
"Recover subscriptions %s: done",
self.__subscriptions,
)

Expand Down
2 changes: 1 addition & 1 deletion kraken/spot/websocket/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async def __reconnect(self: ConnectSpotWebsocketBase) -> None:
await self.__callback({"error": message})
if exception_occur:
break
self.LOG.warning("reconnect over")
self.LOG.warning("Connection closed")

def __get_reconnect_wait(
self: ConnectSpotWebsocketBase,
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dev = [
# testing
"pytest",
"pytest-cov",
"pytest-mock",
# documentation
"sphinx",
"sphinx-rtd-theme",
Expand Down
8 changes: 4 additions & 4 deletions tests/futures/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from asyncio import sleep
from pathlib import Path
from time import time
from typing import Any, Union
from typing import Any

from kraken.futures import KrakenFuturesWSClient

Expand Down Expand Up @@ -47,16 +47,16 @@ class FuturesWebsocketClientTestWrapper(KrakenFuturesWSClient):
LOG: logging.Logger = logging.getLogger(__name__)

def __init__(
self: "FuturesWebsocketClientTestWrapper",
self: FuturesWebsocketClientTestWrapper,
key: str = "",
secret: str = "",
) -> None:
super().__init__(key=key, secret=secret, callback=self.on_message)
self.LOG.setLevel(logging.INFO)

async def on_message(
self: "FuturesWebsocketClientTestWrapper",
message: Union[list, dict],
self: FuturesWebsocketClientTestWrapper,
message: list | dict,
) -> None:
"""
This is the callback function that must be implemented
Expand Down
61 changes: 60 additions & 1 deletion tests/futures/test_futures_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from __future__ import annotations

import asyncio
import logging
from typing import Any

import pytest
Expand Down Expand Up @@ -169,7 +170,10 @@ async def submit_subscription() -> None:
await client.subscribe(feed="fills", products=["PI_XBTUSD"])

await client.subscribe(feed="open_orders")
await async_wait(2)
await async_wait(seconds=2)

assert len(client.get_active_subscriptions()) == 1
await async_wait(seconds=1)

asyncio.run(submit_subscription())

Expand Down Expand Up @@ -281,3 +285,58 @@ async def check_subscriptions() -> None:
"{'event': 'unsubscribed', 'feed': 'ticker', 'product_ids': ['PI_XBTUSD']}",
):
assert expected in caplog.text


@pytest.mark.wip()
@pytest.mark.futures()
@pytest.mark.futures_auth()
@pytest.mark.futures_websocket()
def test_resubscribe(
futures_api_key: str,
futures_secret_key: str,
caplog: Any,
mocker: Any,
) -> None:
"""
Test that forces a reconnect by closing the connection to check if the
authenticated feeds will be resubscribed correctly.
"""
caplog.set_level(logging.INFO)

async def check_resubscribe() -> None:
client: FuturesWebsocketClientTestWrapper = FuturesWebsocketClientTestWrapper(
key=futures_api_key,
secret=futures_secret_key,
)

assert client.get_active_subscriptions() == []
await async_wait(seconds=1)

await client.subscribe(feed="open_orders")
await async_wait(seconds=2)
assert len(client.get_active_subscriptions()) == 1

mocker.patch.object(
client._conn,
"_ConnectFuturesWebsocket__get_reconnect_wait",
return_value=2,
)

await client._conn.close_connection()
await async_wait(seconds=5)
assert len(client.get_active_subscriptions()) == 1

asyncio.run(check_resubscribe())
for phrase in (
"Websocket connected!",
"exception=ConnectionClosedOK(Close(code=1000, reason=''), Close(code=1000, reason=''), False)> got an exception sent 1000 (OK); then received 1000 (OK)",
"Connection closed",
"Recover subscriptions [{'event': 'subscribe', 'feed': 'open_orders'}]: waiting",
"Recover subscriptions [{'event': 'subscribe', 'feed': 'open_orders'}]: done",
):
assert phrase in caplog.text

assert (
"{'event': 'alert', 'message': 'Failed to subscribe to authenticated feed'}"
not in caplog.text
)
2 changes: 0 additions & 2 deletions tests/spot/test_spot_funding.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ def test_wallet_transfer(spot_auth_funding: Funding) -> None:


@pytest.mark.spot()
@pytest.mark.wip()
@pytest.mark.spot_auth()
@pytest.mark.spot_funding()
@pytest.mark.skip(reason="CI does not have withdraw permission")
Expand All @@ -187,7 +186,6 @@ def test_withdraw_methods(spot_auth_funding: Funding) -> None:


@pytest.mark.spot()
@pytest.mark.wip()
@pytest.mark.spot_auth()
@pytest.mark.spot_funding()
@pytest.mark.skip(reason="CI does not have withdraw permission")
Expand Down

0 comments on commit d5f6373

Please sign in to comment.