Skip to content

Commit

Permalink
Add SessionManager for renewing client sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
btschwertfeger committed Dec 19, 2024
1 parent 9fd6c59 commit 6995248
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 46 deletions.
4 changes: 0 additions & 4 deletions .github/dependabot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@ updates:
directory: "/"
schedule:
interval: "weekly"
reviewers:
- "btschwertfeger"
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"
reviewers:
- "btschwertfeger"
ignore:
- dependency-name: "ruff"
168 changes: 126 additions & 42 deletions kraken/base_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import hmac
import json
import time
from copy import deepcopy
from functools import wraps
from typing import TYPE_CHECKING, Any, TypeVar
from urllib.parse import urlencode, urljoin
Expand Down Expand Up @@ -173,6 +172,103 @@ def check_batch_status(self: ErrorHandler, data: dict) -> dict:
return data


class SessionManager:
"""
Manages the requests-based session for the sync Spot and Futures clients.
Kraken rejects requests that are older than 60 minutes without further
information. To avoid this, the session manager creates a new session
every 30 minutes.
"""

HEADERS: Final[dict] = {"User-Agent": "btschwertfeger/python-kraken-sdk"}

def __init__(
self: SessionManager,
*,
proxy: str | None = None,
max_session_age: int = 1800,
) -> None:
"""
Initialize the session manager.
:param max_session_age: Maximum session age in seconds (default: 1800s
or 30 minutes)
:type max_session_age: int
:param proxy: Proxy URL
:type proxy: str, optional
"""
self.__proxy: str = proxy
self.__max_session_age: int = max_session_age
self.__session: requests.Session = self.create_new_session()
self.__session_start_time: float = time.time()

def create_new_session(self: SessionManager) -> requests.Session:
"""Create a new session."""
session = requests.Session()
session.headers.update(self.HEADERS)
if self.__proxy is not None:
session.proxies.update(
{
"http": self.__proxy,
"https": self.__proxy,
},
)
return session

def get_session(self: SessionManager) -> requests.Session:
"""Get a valid session, recreating if the current one is too old."""
if time.time() - self.__session_start_time > self.__max_session_age:
self.__session.close() # Close the old session
self.__session = self.create_new_session()
self.__session_start_time = time.time()
return self.__session


class AsyncSessionManager:
"""
Manages aiohttp-based sessions for the async Spot and Futures clients.
Kraken rejects requests that are older than 60 minutes without further
information. To avoid this, the session manager creates a new session
every 30 minutes.
"""

HEADERS: Final[dict] = {"User-Agent": "btschwertfeger/python-kraken-sdk"}

def __init__(
self: AsyncSessionManager,
*,
proxy: str | None = None,
max_session_age: int = 1800,
) -> None:
"""
Initialize the session manager.
:param max_session_age: Maximum session age in seconds (default: 1800s
or 30 minutes)
:type max_session_age: int
:param proxy: Proxy URL
:type proxy: str, optional
"""
self.__proxy: str = proxy
self.__max_session_age: int = max_session_age
self.__session: aiohttp.ClientSession = self.create_new_session()
self.__session_start_time: float = time.time()

def create_new_session(self: AsyncSessionManager) -> aiohttp.ClientSession:
"""Create a new session."""
return aiohttp.ClientSession(headers=self.HEADERS, proxy=self.__proxy)

async def get_session(self: AsyncSessionManager) -> aiohttp.ClientSession:
"""Get a valid session, recreating if the current one is too old."""
if time.time() - self.__session_start_time > self.__max_session_age:
await self.__session.close()
self.__session = self.create_new_session()
self.__session_start_time = time.time()
return self.__session


class SpotClient:
"""
This class is the base for all Spot clients, handles un-/signed
Expand All @@ -193,7 +289,6 @@ class SpotClient:

URL: str = "https://api.kraken.com"
TIMEOUT: int = 10
HEADERS: Final[dict] = {"User-Agent": "btschwertfeger/python-kraken-sdk"}

def __init__( # nosec: B107
self: SpotClient,
Expand All @@ -211,15 +306,8 @@ def __init__( # nosec: B107
self._secret: str = secret
self._use_custom_exceptions: bool = use_custom_exceptions
self._err_handler: ErrorHandler = ErrorHandler()
self.__session: requests.Session = requests.Session()
if proxy is not None:
self.__session.proxies.update(
{
"http": proxy,
"https": proxy,
},
)
self.__session.headers.update(self.HEADERS)
self.session_manager: SessionManager = SessionManager(proxy=proxy)
self.__session: requests.Session = self.session_manager.create_new_session()

def _prepare_request(
self: SpotClient,
Expand Down Expand Up @@ -254,7 +342,7 @@ def _prepare_request(
elif query_str:
query_params = query_str

headers: dict = deepcopy(self.HEADERS)
headers: dict = {}

if auth:
if not self._key or not self._secret:
Expand Down Expand Up @@ -340,7 +428,9 @@ def request( # noqa: PLR0913 # pylint: disable=too-many-arguments
query_str=query_str,
extra_params=extra_params,
)

timeout: int = self.TIMEOUT if timeout != 10 else timeout # type: ignore[no-redef]
self.__session = self.session_manager.get_session()

if method in {"GET", "DELETE"}:
return self.__check_response_data(
Expand Down Expand Up @@ -495,8 +585,10 @@ def __init__( # nosec: B107
url=url,
use_custom_exceptions=use_custom_exceptions,
)
self.__session = aiohttp.ClientSession(headers=self.HEADERS)
self.proxy = proxy
self.session_manager: AsyncSessionManager = AsyncSessionManager(proxy=proxy) # type: ignore[assignment]
self.__session: aiohttp.ClientSession = (
self.session_manager.create_new_session()
)

async def request( # type: ignore[override] # pylint: disable=invalid-overridden-method,too-many-arguments # noqa: PLR0913
self: SpotAsyncClient,
Expand Down Expand Up @@ -552,40 +644,38 @@ async def request( # type: ignore[override] # pylint: disable=invalid-overridde
extra_params=extra_params,
)
timeout: int = self.TIMEOUT if timeout != 10 else timeout # type: ignore[no-redef]
self.__session = await self.session_manager.get_session()

if method in {"GET", "DELETE"}:
return await self.__check_response_data( # type: ignore[return-value]
response=await self.__session.request( # type: ignore[misc,call-arg]
response=await self.__session.request(
method=method,
url=f"{url}?{query_params}" if query_params else url,
headers=headers,
timeout=timeout,
proxy=self.proxy,
),
return_raw=return_raw,
)

if do_json:
return await self.__check_response_data( # type: ignore[return-value]
response=await self.__session.request( # type: ignore[misc,call-arg]
response=await self.__session.request(
method=method,
url=url,
headers=headers,
json=params,
timeout=timeout,
proxy=self.proxy,
),
return_raw=return_raw,
)

return await self.__check_response_data( # type: ignore[return-value]
response=await self.__session.request( # type: ignore[misc,call-arg]
response=await self.__session.request(
method=method,
url=url,
headers=headers,
data=params,
timeout=timeout,
proxy=self.proxy,
),
return_raw=return_raw,
)
Expand Down Expand Up @@ -628,7 +718,7 @@ async def __check_response_data( # pylint: disable=invalid-overridden-method

async def async_close(self: SpotAsyncClient) -> None:
"""Closes the aiohttp session"""
await self.__session.close() # type: ignore[func-returns-value]
await self.__session.close()

async def __aenter__(self: Self) -> Self:
return self
Expand Down Expand Up @@ -667,7 +757,6 @@ class FuturesClient:
URL: str = "https://futures.kraken.com"
SANDBOX_URL: str = "https://demo-futures.kraken.com"
TIMEOUT: int = 10
HEADERS: Final[dict] = {"User-Agent": "btschwertfeger/python-kraken-sdk"}

def __init__( # nosec: B107
self: FuturesClient,
Expand All @@ -693,15 +782,8 @@ def __init__( # nosec: B107
self._use_custom_exceptions: bool = use_custom_exceptions

self._err_handler: ErrorHandler = ErrorHandler()
self.__session: requests.Session = requests.Session()
self.__session.headers.update(self.HEADERS)
if proxy is not None:
self.__session.proxies.update(
{
"http": proxy,
"https": proxy,
},
)
self.session_manager: SessionManager = SessionManager(proxy=proxy)
self.__session: requests.Session = self.session_manager.create_new_session()

def _prepare_request(
self: FuturesClient,
Expand Down Expand Up @@ -734,7 +816,7 @@ def _prepare_request(
"" if query_params is None else urlencode(query_params, doseq=True) # type: ignore[arg-type]
)

headers: dict = deepcopy(self.HEADERS)
headers: dict = {}

if auth:
if not self._key or not self._secret:
Expand Down Expand Up @@ -807,6 +889,7 @@ def request( # pylint: disable=too-many-arguments
extra_params=extra_params,
)
timeout: int = self.TIMEOUT if timeout == 10 else timeout # type: ignore[no-redef]
self.__session = self.session_manager.get_session()

if method in {"GET", "DELETE"}:
return self.__check_response_data(
Expand Down Expand Up @@ -969,8 +1052,10 @@ def __init__( # nosec: B107
sandbox=sandbox,
use_custom_exceptions=use_custom_exceptions,
)
self.__session = aiohttp.ClientSession(headers=self.HEADERS)
self.proxy = proxy
self.session_manager: AsyncSessionManager = AsyncSessionManager(proxy=proxy) # type: ignore[assignment]
self.__session: aiohttp.ClientSession = (
self.session_manager.create_new_session()
)

async def request( # type: ignore[override] # pylint: disable=arguments-differ,invalid-overridden-method
self: FuturesAsyncClient,
Expand All @@ -990,42 +1075,41 @@ async def request( # type: ignore[override] # pylint: disable=arguments-differ,
query_params=query_params,
auth=auth,
)
timeout: int = self.TIMEOUT if timeout != 10 else timeout # type: ignore[no-redef]

timeout = self.TIMEOUT if timeout != 10 else timeout
self.__session = await self.session_manager.get_session()

if method in {"GET", "DELETE"}:
return await self.__check_response_data(
response=await self.__session.request( # type: ignore[misc,call-arg]
response=await self.__session.request(
method=method,
url=url,
params=query_string,
headers=headers,
timeout=timeout,
proxy=self.proxy,
),
return_raw=return_raw,
)

if method == "PUT":
return await self.__check_response_data(
response=await self.__session.request( # type: ignore[misc,call-arg]
response=await self.__session.request(
method=method,
url=url,
params=encoded_payload,
headers=headers,
timeout=timeout,
proxy=self.proxy,
),
return_raw=return_raw,
)

return await self.__check_response_data(
response=await self.__session.request( # type: ignore[misc,call-arg]
response=await self.__session.request(
method=method,
url=url,
data=encoded_payload,
headers=headers,
timeout=timeout,
proxy=self.proxy,
),
return_raw=return_raw,
)
Expand Down Expand Up @@ -1074,7 +1158,7 @@ async def __check_response_data( # pylint: disable=invalid-overridden-method

async def async_close(self: FuturesAsyncClient) -> None:
"""Closes the aiohttp session"""
await self.__session.close() # type: ignore[func-returns-value]
await self.__session.close()

async def __aenter__(self: Self) -> Self:
return self
Expand Down

0 comments on commit 6995248

Please sign in to comment.