From 5730f92c7af9f0e0b967aeabfbef6a55b557e1d4 Mon Sep 17 00:00:00 2001 From: MohammadAmin Vahedinia <16556650+mavahedinia@users.noreply.github.com> Date: Mon, 25 Mar 2024 10:30:45 -0400 Subject: [PATCH 1/4] Wakeup worker when there are resources freed up --- src/apscheduler/_schedulers/async_.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/apscheduler/_schedulers/async_.py b/src/apscheduler/_schedulers/async_.py index 21752167..83da8917 100644 --- a/src/apscheduler/_schedulers/async_.py +++ b/src/apscheduler/_schedulers/async_.py @@ -905,7 +905,7 @@ def _get_task_callable(self, task: Task) -> Callable: async def _process_jobs(self, *, task_status: TaskStatus) -> None: wakeup_event = anyio.Event() - async def job_added(event: Event) -> None: + async def check_queue_capacity(event: Event) -> None: if len(self._running_jobs) < self.max_concurrent_jobs: wakeup_event.set() @@ -917,7 +917,7 @@ async def job_added(event: Event) -> None: task_group = await exit_stack.enter_async_context(create_task_group()) # Fetch new jobs every time - exit_stack.enter_context(self.event_broker.subscribe(job_added, {JobAdded})) + exit_stack.enter_context(self.event_broker.subscribe(check_queue_capacity, {JobAdded, JobReleased})) # Signal that we are ready, and wait for the scheduler start event task_status.started() From c50af5ad1d53b540e30043732ea72d2ef7adeb02 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 25 Mar 2024 14:31:55 +0000 Subject: [PATCH 2/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/apscheduler/_schedulers/async_.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/apscheduler/_schedulers/async_.py b/src/apscheduler/_schedulers/async_.py index 83da8917..f81b869d 100644 --- a/src/apscheduler/_schedulers/async_.py +++ b/src/apscheduler/_schedulers/async_.py @@ -917,7 +917,11 @@ async def check_queue_capacity(event: Event) -> None: task_group = await exit_stack.enter_async_context(create_task_group()) # Fetch new jobs every time - exit_stack.enter_context(self.event_broker.subscribe(check_queue_capacity, {JobAdded, JobReleased})) + exit_stack.enter_context( + self.event_broker.subscribe( + check_queue_capacity, {JobAdded, JobReleased} + ) + ) # Signal that we are ready, and wait for the scheduler start event task_status.started() From fa152f4eff36071f679b94b125a08ce56ee74426 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 9 May 2024 18:43:46 +0300 Subject: [PATCH 3/4] Added test --- tests/test_schedulers.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 38c3aaf9..e9c3dfce 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -15,7 +15,13 @@ import anyio import pytest -from anyio import WouldBlock, create_memory_object_stream, fail_after, sleep +from anyio import ( + Lock, + WouldBlock, + create_memory_object_stream, + fail_after, + sleep, +) from pytest import MonkeyPatch from pytest_mock import MockerFixture, MockFixture @@ -819,6 +825,28 @@ async def test_wait_until_stopped(self) -> None: # This should be a no-op await scheduler.wait_until_stopped() + async def test_max_concurrent_jobs(self) -> None: + lock = Lock() + scheduler = AsyncScheduler(max_concurrent_jobs=1) + tasks_done = 0 + + async def acquire_release() -> None: + nonlocal tasks_done + lock.acquire_nowait() + await sleep(0.1) + tasks_done += 1 + if tasks_done == 2: + await scheduler.stop() + + lock.release() + + with fail_after(3): + async with scheduler: + await scheduler.configure_task("dummyjob", func=acquire_release) + await scheduler.add_job("dummyjob") + await scheduler.add_job("dummyjob") + await scheduler.run_until_stopped() + class TestSyncScheduler: def test_configure(self) -> None: From ce74e1e5d06c3ff22fcb0091a2f63df96a9ce4e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 9 May 2024 18:48:48 +0300 Subject: [PATCH 4/4] Added changelog entry --- docs/versionhistory.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 9b13bb51..3b235aad 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -22,6 +22,9 @@ APScheduler, see the :doc:`migration section `. - Fixed dialect name checks in the SQLAlchemy job store - Fixed JSON and CBOR serializers unable to serialize enums - Fixed infinite loop in CalendarIntervalTrigger with UTC timezone (PR by unights) +- Fixed scheduler not resuming job processing when ``max_concurrent_jobs`` had been + reached and then a job was completed, thus making job processing possible again + (PR by MohammadAmin Vahedinia) **4.0.0a4**