Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support pausing and unpausing schedules #902

Merged
merged 14 commits into from
May 9, 2024
Merged
63 changes: 62 additions & 1 deletion src/apscheduler/_schedulers/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from inspect import isbuiltin, isclass, ismethod, ismodule
from logging import Logger, getLogger
from types import TracebackType
from typing import Any, Callable, Iterable, Mapping, cast, overload
from typing import Any, Callable, Iterable, Literal, Mapping, cast, overload
from uuid import UUID, uuid4

import anyio
Expand Down Expand Up @@ -410,6 +410,7 @@ async def add_schedule(
id: str | None = None,
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
paused: bool = False,
job_executor: str | UnsetValue = unset,
coalesce: CoalescePolicy = CoalescePolicy.latest,
misfire_grace_time: float | timedelta | None | UnsetValue = unset,
Expand All @@ -427,6 +428,7 @@ async def add_schedule(
based ID will be assigned)
:param args: positional arguments to be passed to the task function
:param kwargs: keyword arguments to be passed to the task function
:param paused: whether the schedule is paused
:param job_executor: name of the job executor to run the task with
:param coalesce: determines what to do when processing the schedule if multiple
fire times have become due for this schedule since the last processing
Expand Down Expand Up @@ -478,6 +480,7 @@ async def add_schedule(
trigger=trigger,
args=args,
kwargs=kwargs,
paused=paused,
coalesce=coalesce,
misfire_grace_time=task.misfire_grace_time
if misfire_grace_time is unset
Expand Down Expand Up @@ -529,6 +532,64 @@ async def remove_schedule(self, id: str) -> None:
self._check_initialized()
await self.data_store.remove_schedules({id})

async def pause_schedule(self, schedule_id: str) -> None:
"""Pause the specified schedule."""
self._check_initialized()
await self.data_store.add_schedule(
schedule=attrs.evolve(await self.get_schedule(schedule_id), paused=True),
conflict_policy=ConflictPolicy.replace,
)

def _get_unpaused_next_fire_time(
WillDaSilva marked this conversation as resolved.
Show resolved Hide resolved
self,
schedule: Schedule,
resume_from: datetime | Literal["now"] | None,
) -> datetime | None:
if resume_from is None:
return schedule.next_fire_time
if resume_from == "now":
resume_from = datetime.now(tz=timezone.utc)
if (
schedule.next_fire_time is not None
and schedule.next_fire_time >= resume_from
):
return schedule.next_fire_time
try:
while (next_fire_time := schedule.trigger.next()) < resume_from:
pass # Advance `next_fire_time` until its at or past `resume_from`
except TypeError: # The trigger is exhausted
return None
return next_fire_time
WillDaSilva marked this conversation as resolved.
Show resolved Hide resolved

async def unpause_schedule(
self,
schedule_id: str,
*,
resume_from: datetime | Literal["now"] | None = None,
) -> None:
"""
Unpause the specified schedule.


:param resume_from: the time to resume the schedules from, or ``'now'`` as a
shorthand for ``datetime.now(tz=UTC)`` or ``None`` to resume from where the
schedule left off which may cause it to misfire

"""
self._check_initialized()
schedule = await self.get_schedule(schedule_id)
await self.data_store.add_schedule(
schedule=attrs.evolve(
schedule,
paused=False,
next_fire_time=self._get_unpaused_next_fire_time(
schedule,
resume_from,
),
),
conflict_policy=ConflictPolicy.replace,
)

async def add_job(
self,
func_or_task_id: TaskType,
Expand Down
25 changes: 23 additions & 2 deletions src/apscheduler/_schedulers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import threading
from collections.abc import MutableMapping, Sequence
from contextlib import ExitStack
from datetime import timedelta
from datetime import datetime, timedelta
from functools import partial
from logging import Logger
from types import TracebackType
from typing import Any, Callable, Iterable, Mapping, overload
from typing import Any, Callable, Iterable, Literal, Mapping, overload
from uuid import UUID

from anyio.from_thread import BlockingPortal, start_blocking_portal
Expand Down Expand Up @@ -238,6 +238,7 @@ def add_schedule(
id: str | None = None,
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
paused: bool = False,
job_executor: str | UnsetValue = unset,
coalesce: CoalescePolicy = CoalescePolicy.latest,
misfire_grace_time: float | timedelta | None | UnsetValue = unset,
Expand All @@ -254,6 +255,7 @@ def add_schedule(
id=id,
args=args,
kwargs=kwargs,
paused=paused,
job_executor=job_executor,
coalesce=coalesce,
misfire_grace_time=misfire_grace_time,
Expand All @@ -275,6 +277,25 @@ def remove_schedule(self, id: str) -> None:
self._ensure_services_ready()
self._portal.call(self._async_scheduler.remove_schedule, id)

def pause_schedule(self, schedule_id: str) -> None:
self._ensure_services_ready()
self._portal.call(self._async_scheduler.pause_schedule, schedule_id)

def unpause_schedule(
self,
schedule_id: str,
*,
resume_from: datetime | Literal["now"] | None = None,
) -> None:
self._ensure_services_ready()
self._portal.call(
partial(
self._async_scheduler.unpause_schedule,
schedule_id,
resume_from=resume_from,
)
)

def add_job(
self,
func_or_task_id: TaskType,
Expand Down
2 changes: 2 additions & 0 deletions src/apscheduler/_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class Schedule:
:var str task_id: unique identifier of the task to be run on this schedule
:var tuple args: positional arguments to pass to the task callable
:var dict[str, Any] kwargs: keyword arguments to pass to the task callable
:var bool paused: whether the schedule is paused
:var CoalescePolicy coalesce: determines what to do when processing the schedule if
multiple fire times have become due for this schedule since the last processing
:var ~datetime.timedelta | None misfire_grace_time: maximum number of seconds the
Expand Down Expand Up @@ -105,6 +106,7 @@ class Schedule:
kwargs: dict[str, Any] = attrs.field(
eq=False, order=False, converter=dict, default=()
)
paused: bool = attrs.field(eq=False, order=False, default=False)
coalesce: CoalescePolicy = attrs.field(
eq=False,
order=False,
Expand Down
6 changes: 5 additions & 1 deletion src/apscheduler/datastores/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,12 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul
schedules: list[Schedule] = []
for state in self._schedules:
if state.next_fire_time is None or state.next_fire_time > now:
# The schedule is either paused or not yet due
# The schedule is either exhausted or not yet due. There will be no
# schedules that are due after this one, so we can stop here.
break
elif state.schedule.paused:
# The schedule is paused
continue
elif state.acquired_by is not None:
if state.acquired_by != scheduler_id and now <= state.acquired_until:
# The schedule has been acquired by another scheduler and the
Expand Down
16 changes: 13 additions & 3 deletions src/apscheduler/datastores/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,19 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul
lambda: self._schedules.find(
{
"next_fire_time": {"$lte": now},
"$or": [
{"acquired_until": {"$exists": False}},
{"acquired_until": {"$lt": now}},
"$and": [
{
"$or": [
{"paused": {"$exists": False}},
{"paused": False},
]
},
{
"$or": [
{"acquired_until": {"$exists": False}},
{"acquired_until": {"$lt": now}},
]
},
],
WillDaSilva marked this conversation as resolved.
Show resolved Hide resolved
},
session=session,
Expand Down
9 changes: 8 additions & 1 deletion src/apscheduler/datastores/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from anyio import CancelScope, to_thread
from sqlalchemy import (
BigInteger,
Boolean,
Column,
DateTime,
Enum,
Expand All @@ -31,6 +32,7 @@
Uuid,
and_,
bindparam,
false,
or_,
select,
)
Expand Down Expand Up @@ -293,6 +295,7 @@ def get_table_definitions(self) -> MetaData:
Column("trigger", LargeBinary),
Column("args", LargeBinary),
Column("kwargs", LargeBinary),
Column("paused", Boolean, nullable=False, server_default=literal(False)),
Column("coalesce", Enum(CoalescePolicy), nullable=False),
Column("misfire_grace_time", interval_type),
Column("max_jitter", interval_type),
Expand Down Expand Up @@ -600,6 +603,7 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul
and_(
self._t_schedules.c.next_fire_time.isnot(None),
comparison,
self._t_schedules.c.paused == false(),
or_(
self._t_schedules.c.acquired_until.is_(None),
self._t_schedules.c.acquired_until < now,
Expand Down Expand Up @@ -752,7 +756,10 @@ async def get_next_schedule_run_time(self) -> datetime | None:

statenent = (
select(*columns)
.where(self._t_schedules.c.next_fire_time.isnot(None))
.where(
self._t_schedules.c.next_fire_time.isnot(None),
self._t_schedules.c.paused == false(),
)
.order_by(self._t_schedules.c.next_fire_time)
.limit(1)
)
Expand Down
26 changes: 24 additions & 2 deletions tests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ async def test_configure_task(self, raw_datastore: DataStore) -> None:
assert isinstance(event, TaskUpdated)
assert event.task_id == "mytask"

async def test_add_remove_schedule(
async def test_add_pause_unpause_remove_schedule(
self, raw_datastore: DataStore, timezone: ZoneInfo
) -> None:
send, receive = create_memory_object_stream[Event](3)
send, receive = create_memory_object_stream[Event](5)
async with AsyncScheduler(data_store=raw_datastore) as scheduler:
scheduler.subscribe(send.send)
now = datetime.now(timezone)
Expand All @@ -210,6 +210,16 @@ async def test_add_remove_schedule(
assert schedules[0].id == "foo"
assert schedules[0].task_id == f"{__name__}:dummy_async_job"

await scheduler.pause_schedule("foo")
schedule = await scheduler.get_schedule("foo")
assert schedule.paused
assert schedule.next_fire_time == now

await scheduler.unpause_schedule("foo")
schedule = await scheduler.get_schedule("foo")
assert not schedule.paused
assert schedule.next_fire_time == now

await scheduler.remove_schedule(schedule_id)
assert not await scheduler.get_schedules()

Expand All @@ -224,6 +234,18 @@ async def test_add_remove_schedule(
assert event.task_id == f"{__name__}:dummy_async_job"
assert event.next_fire_time == now

event = await receive.receive()
assert isinstance(event, ScheduleUpdated)
assert event.schedule_id == "foo"
assert event.task_id == f"{__name__}:dummy_async_job"
assert event.next_fire_time == now

event = await receive.receive()
assert isinstance(event, ScheduleUpdated)
assert event.schedule_id == "foo"
assert event.task_id == f"{__name__}:dummy_async_job"
assert event.next_fire_time == now

event = await receive.receive()
assert isinstance(event, ScheduleRemoved)
assert event.schedule_id == "foo"
Expand Down
Loading