diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index bfa921d8..3aabc569 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -24,10 +24,7 @@ If there are no entries after the last release, use `**UNRELEASED**` as the vers If, say, your patch fixes issue #999, the entry should look like this: `* Fix big bad boo-boo in the async scheduler (#999 -_; PR by Yourname)` +_; PR by @yourgithubaccount)` If there's no issue linked, just link to your pull request instead by updating the changelog after you've created the PR. - -If possible, use your real name in the changelog entry. If not, use your GitHub -username. diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index dec6197c..c71855a4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -20,7 +20,7 @@ repos: - id: trailing-whitespace - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.4.4 + rev: v0.4.7 hooks: - id: ruff args: [--fix, --show-fixes] @@ -40,7 +40,7 @@ repos: stages: [manual] - repo: https://github.com/codespell-project/codespell - rev: v2.2.6 + rev: v2.3.0 hooks: - id: codespell diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index b132c418..b041b5fd 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -6,7 +6,11 @@ APScheduler, see the :doc:`migration section `. **UNRELEASED** +- **BREAKING** Refactored ``AsyncpgEventBroker`` to directly accept a connection string, + thus eliminating the need for the ``AsyncpgEventBroker.from_dsn()`` class method - Added the ``psycopg`` event broker +- Added useful indexes and removed useless ones in ``SQLAlchemyDatastore`` and + ``MongoDBDataStore`` **4.0.0a5** diff --git a/pyproject.toml b/pyproject.toml index 99c56ef5..9beab956 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,7 @@ cbor = ["cbor2 >= 5.0"] mongodb = ["pymongo >= 4"] mqtt = ["paho-mqtt >= 2.0"] redis = ["redis >= 5.0.1"] -sqlalchemy = ["sqlalchemy[asyncio] >= 2.0.19"] +sqlalchemy = ["sqlalchemy[asyncio] >= 2.0.24"] test = [ "APScheduler[cbor,mongodb,mqtt,redis,sqlalchemy]", "asyncpg >= 0.20; python_implementation == 'CPython'", diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index 94a4e5c8..0f06e6ea 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -206,11 +206,12 @@ def _initialize(self) -> None: self._jobs.delete_many({}, session=session) self._jobs_results.delete_many({}, session=session) + self._schedules.create_index("task_id", session=session) self._schedules.create_index("next_fire_time", session=session) + self._schedules.create_index("acquired_by", session=session) self._jobs.create_index("task_id", session=session) self._jobs.create_index("schedule_id", session=session) - self._jobs.create_index("created_at", session=session) - self._jobs_results.create_index("finished_at", session=session) + self._jobs.create_index("acquired_by", session=session) self._jobs_results.create_index("expires_at", session=session) async def start( diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index 87cd8bf0..79863cf4 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -301,7 +301,7 @@ def get_table_definitions(self) -> MetaData: Column("max_jitter", interval_type), *next_fire_time_tzoffset_columns, Column("last_fire_time", timestamp_type), - Column("acquired_by", Unicode(500)), + Column("acquired_by", Unicode(500), index=True), Column("acquired_until", timestamp_type), ) Table( @@ -317,7 +317,7 @@ def get_table_definitions(self) -> MetaData: Column("start_deadline", timestamp_type), Column("result_expiration_time", interval_type), Column("created_at", timestamp_type, nullable=False), - Column("acquired_by", Unicode(500)), + Column("acquired_by", Unicode(500), index=True), Column("acquired_until", timestamp_type), ) Table( @@ -326,7 +326,7 @@ def get_table_definitions(self) -> MetaData: Column("job_id", Uuid, primary_key=True), Column("outcome", Enum(JobOutcome, metadata=metadata), nullable=False), Column("started_at", timestamp_type, index=True), - Column("finished_at", timestamp_type, nullable=False, index=True), + Column("finished_at", timestamp_type, nullable=False), Column("expires_at", timestamp_type, nullable=False, index=True), Column("exception", LargeBinary), Column("return_value", LargeBinary), diff --git a/src/apscheduler/eventbrokers/asyncpg.py b/src/apscheduler/eventbrokers/asyncpg.py index d45aaf04..5687d2ba 100644 --- a/src/apscheduler/eventbrokers/asyncpg.py +++ b/src/apscheduler/eventbrokers/asyncpg.py @@ -1,10 +1,9 @@ from __future__ import annotations -from collections.abc import Awaitable, Mapping -from contextlib import AsyncExitStack -from functools import partial +from collections.abc import AsyncGenerator, Mapping +from contextlib import AsyncExitStack, asynccontextmanager from logging import Logger -from typing import TYPE_CHECKING, Any, Callable, cast +from typing import TYPE_CHECKING, Any, cast import asyncpg import attrs @@ -16,6 +15,7 @@ ) from anyio.streams.memory import MemoryObjectSendStream from asyncpg import Connection, InterfaceError +from attr.validators import instance_of from .._events import Event from .._exceptions import SerializationError @@ -33,33 +33,22 @@ class AsyncpgEventBroker(BaseExternalEventBroker): .. _asyncpg: https://pypi.org/project/asyncpg/ - :param connection_factory: a callable that creates an asyncpg connection + :param dsn: a libpq connection string (e.g. + ``postgres://user:pass@host:port/dbname``) + :param options: extra keyword arguments passed to :func:`asyncpg.connect` :param channel: the ``NOTIFY`` channel to use :param max_idle_time: maximum time to let the connection go idle, before sending a ``SELECT 1`` query to prevent a connection timeout """ - connection_factory: Callable[[], Awaitable[Connection]] + dsn: str + options: Mapping[str, Any] = attrs.field( + factory=dict, validator=instance_of(Mapping) + ) channel: str = attrs.field(kw_only=True, default="apscheduler") max_idle_time: float = attrs.field(kw_only=True, default=10) - _send: MemoryObjectSendStream[str] = attrs.field(init=False) - - @classmethod - def from_dsn( - cls, dsn: str, options: Mapping[str, Any] | None = None, **kwargs: Any - ) -> AsyncpgEventBroker: - """ - Create a new asyncpg event broker from an existing asyncpg connection pool. - :param dsn: data source name, passed as first positional argument to - :func:`asyncpg.connect` - :param options: keyword arguments passed to :func:`asyncpg.connect` - :param kwargs: keyword arguments to pass to the initializer of this class - :return: the newly created event broker - - """ - factory = partial(asyncpg.connect, dsn, **(options or {})) - return cls(factory, **kwargs) + _send: MemoryObjectSendStream[str] = attrs.field(init=False) @classmethod def from_async_sqla_engine( @@ -76,8 +65,7 @@ def from_async_sqla_engine( :param engine: an asynchronous SQLAlchemy engine using asyncpg as the driver :type engine: ~sqlalchemy.ext.asyncio.AsyncEngine - :param options: extra keyword arguments passed to :func:`asyncpg.connect` (will - override any automatically generated arguments based on the engine) + :param options: extra keyword arguments passed to :func:`asyncpg.connect` :param kwargs: keyword arguments to pass to the initializer of this class :return: the newly created event broker @@ -88,25 +76,24 @@ def from_async_sqla_engine( f"{engine.dialect.driver})" ) - connect_args = dict(engine.url.query) - for optname in ("host", "port", "database", "username", "password"): - value = getattr(engine.url, optname) - if value is not None: - if optname == "username": - optname = "user" - - connect_args[optname] = value - - if options: - connect_args |= options - - factory = partial(asyncpg.connect, **connect_args) - return cls(factory, **kwargs) + dsn = engine.url.render_as_string(hide_password=False).replace("+asyncpg", "") + return cls(dsn, options or {}, **kwargs) @property def _temporary_failure_exceptions(self) -> tuple[type[Exception], ...]: return OSError, InterfaceError + @asynccontextmanager + async def _connect(self) -> AsyncGenerator[asyncpg.Connection, None]: + async for attempt in self._retry(): + with attempt: + conn = await asyncpg.connect(self.dsn, **self.options) + try: + yield conn + finally: + with move_on_after(5, shield=True): + await conn.close(timeout=3) + async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None: await super().start(exit_stack, logger) self._send = cast( @@ -125,11 +112,6 @@ def listen_callback( if event is not None: self._task_group.start_soon(self.publish_local, event) - async def close_connection() -> None: - if not conn.is_closed(): - with move_on_after(3, shield=True): - await conn.close() - async def unsubscribe() -> None: if not conn.is_closed(): with move_on_after(3, shield=True): @@ -139,11 +121,7 @@ async def unsubscribe() -> None: send, receive = create_memory_object_stream[str](100) while True: async with AsyncExitStack() as exit_stack: - async for attempt in self._retry(): - with attempt: - conn = await self.connection_factory() - - exit_stack.push_async_callback(close_connection) + conn = await exit_stack.enter_async_context(self._connect()) self._logger.info("Connection established") try: await conn.add_listener(self.channel, listen_callback) diff --git a/src/apscheduler/eventbrokers/psycopg.py b/src/apscheduler/eventbrokers/psycopg.py index 9cfefc27..f81b7798 100644 --- a/src/apscheduler/eventbrokers/psycopg.py +++ b/src/apscheduler/eventbrokers/psycopg.py @@ -4,7 +4,6 @@ from contextlib import AsyncExitStack, asynccontextmanager from logging import Logger from typing import TYPE_CHECKING, Any, NoReturn -from urllib.parse import urlunparse import attrs from anyio import ( @@ -40,6 +39,8 @@ class PsycopgEventBroker(BaseExternalEventBroker): :param conninfo: a libpq connection string (e.g. ``postgres://user:pass@host:port/dbname``) + :param options: extra keyword arguments passed to + :meth:`psycopg.AsyncConnection.connect` :param channel: the ``NOTIFY`` channel to use :param max_idle_time: maximum time (in seconds) to let the connection go idle, before sending a ``SELECT 1`` query to prevent a connection timeout @@ -71,10 +72,10 @@ def from_async_sqla_engine( The engine will only be used to create the appropriate options for :meth:`psycopg.AsyncConnection.connect`. - :param engine: an asynchronous SQLAlchemy engine using asyncpg as the driver + :param engine: an asynchronous SQLAlchemy engine using psycopg as the driver :type engine: ~sqlalchemy.ext.asyncio.AsyncEngine - :param options: extra keyword arguments passed to :func:`asyncpg.connect` (will - override any automatically generated arguments based on the engine) + :param options: extra keyword arguments passed to + :meth:`psycopg.AsyncConnection.connect` :param kwargs: keyword arguments to pass to the initializer of this class :return: the newly created event broker @@ -85,17 +86,10 @@ def from_async_sqla_engine( f"{engine.dialect.driver})" ) - conninfo = urlunparse( - [ - "postgres", - engine.url.username, - engine.url.password, - engine.url.host, - engine.url.database, - ] + conninfo = engine.url.render_as_string(hide_password=False).replace( + "+psycopg", "" ) - opts = dict(options, autocommit=True) - return cls(conninfo, opts, **kwargs) + return cls(conninfo, options or {}, **kwargs) @property def _temporary_failure_exceptions(self) -> tuple[type[Exception], ...]: @@ -109,7 +103,8 @@ async def _connect(self) -> AsyncGenerator[AsyncConnection, None]: try: yield conn finally: - await conn.close() + with move_on_after(5, shield=True): + await conn.close() async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None: await super().start(exit_stack, logger) diff --git a/tests/conftest.py b/tests/conftest.py index 5eb89de8..4745da6d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -80,7 +80,7 @@ async def asyncpg_broker(serializer: Serializer) -> EventBroker: pytest.importorskip("asyncpg", reason="asyncpg is not installed") from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker - broker = AsyncpgEventBroker.from_dsn( + broker = AsyncpgEventBroker( "postgres://postgres:secret@localhost:5432/testdb", serializer=serializer ) return broker diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py index 88150f4a..8ad31846 100644 --- a/tests/test_eventbrokers.py +++ b/tests/test_eventbrokers.py @@ -119,3 +119,51 @@ async def test_cancel_stop(raw_event_broker: EventBroker, logger: Logger) -> Non async with AsyncExitStack() as exit_stack: await raw_event_broker.start(exit_stack, logger) scope.cancel() + + +def test_asyncpg_broker_from_async_engine() -> None: + pytest.importorskip("asyncpg", reason="asyncpg is not installed") + from sqlalchemy import URL + from sqlalchemy.ext.asyncio import create_async_engine + + from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker + + url = URL( + "postgresql+asyncpg", + "myuser", + "c /%@", + "localhost", + 7654, + "dbname", + {"opt1": "foo", "opt2": "bar"}, + ) + engine = create_async_engine(url) + broker = AsyncpgEventBroker.from_async_sqla_engine(engine) + assert isinstance(broker, AsyncpgEventBroker) + assert broker.dsn == ( + "postgresql://myuser:c %2F%25%40@localhost:7654/dbname?opt1=foo&opt2=bar" + ) + + +def test_psycopg_broker_from_async_engine() -> None: + pytest.importorskip("psycopg", reason="psycopg is not installed") + from sqlalchemy import URL + from sqlalchemy.ext.asyncio import create_async_engine + + from apscheduler.eventbrokers.psycopg import PsycopgEventBroker + + url = URL( + "postgresql+psycopg", + "myuser", + "c /%@", + "localhost", + 7654, + "dbname", + {"opt1": "foo", "opt2": "bar"}, + ) + engine = create_async_engine(url) + broker = PsycopgEventBroker.from_async_sqla_engine(engine) + assert isinstance(broker, PsycopgEventBroker) + assert broker.conninfo == ( + "postgresql://myuser:c %2F%25%40@localhost:7654/dbname?opt1=foo&opt2=bar" + )