Extending APScheduler

This document is meant to explain how to develop your custom triggers and data stores.

Custom triggers

The built-in triggers cover the needs of the majority of all users, particularly so when combined using AndTrigger and OrTrigger. However, some users may need specialized scheduling logic. This can be accomplished by creating your own custom trigger class.

To implement your scheduling logic, create a new class that inherits from the Trigger interface class:

from __future__ import annotations

from apscheduler.abc import Trigger

class MyCustomTrigger(Trigger):
    def next() -> datetime | None:
        ... # Your custom logic here

    def __getstate__():
        ... # Return the serializable state here

    def __setstate__(state):
        ... # Restore the state from the return value of __getstate__()

Requirements and constraints for trigger classes:

  • next() must always either return a timezone aware datetime object or None if a new run time cannot be calculated

  • next() must never return the same datetime twice and never one that is earlier than the previously returned one

  • __setstate__() must accept the return value of __getstate__() and restore the trigger to the functionally same state as the original

  • __getstate__() may only return an object containing types serializable by Serializer

Triggers are stateful objects. The next() method is where you determine the next run time based on the current state of the trigger. The trigger’s internal state needs to be updated before returning to ensure that the trigger won’t return the same datetime on the next call. The trigger code does not need to be thread-safe.

Custom job executors

If you need the ability to use third party frameworks or services to handle the actual execution of jobs, you will need a custom job executor.

A job executor needs to inherit from JobExecutor. This interface contains one abstract method you’re required to implement: run_job(). This method is called with two arguments:

  1. func: the callable you’re supposed to call

  2. job: the Job instance

The run_job() implementation needs to call func with the positional and keyword arguments attached to the job (job.args and job.kwargs, respectively). The return value of the callable must be returned from the method.

Here’s an example of a simple job executor that runs a (synchronous) callable in a thread:

from contextlib import AsyncExitStack
from functools import partial

from anyio import to_thread
from apscheduler import Job
from apscheduler.abc import JobExecutor

class ThreadJobExecutor(JobExecutor):
    async def run_job(self, func: Callable[..., Any], job: Job) -> Any:
        wrapped = partial(func, *job.args, **job.kwargs)
        return await to_thread.run_sync(wrapped)

If you need to initialize some underlying services, you can override the start() method. For example, the executor above could be improved to take a maximum number of threads and create an AnyIO CapacityLimiter:

from contextlib import AsyncExitStack
from functools import partial

from anyio import CapacityLimiter, to_thread
from apscheduler import Job
from apscheduler.abc import JobExecutor

class ThreadJobExecutor(JobExecutor):
    _limiter: CapacityLimiter

    def __init__(self, max_threads: int):
        self.max_threads = max_threads

    async def start(self, exit_stack: AsyncExitStack) -> None:
        self._limiter = CapacityLimiter(self.max_workers)

    async def run_job(self, func: Callable[..., Any], job: Job) -> Any:
        wrapped = partial(func, *job.args, **job.kwargs)
        return await to_thread.run_sync(wrapped, limiter=self._limiter)

Custom data stores

If you want to make use of some external service to store the scheduler data, and it’s not covered by a built-in data store implementation, you may want to create a custom data store class.

A data store implementation needs to inherit from DataStore and implement several abstract methods:

The start() method is where your implementation can perform any initialization, including starting any background tasks. This method is called with two arguments:

  1. exit_stack: an AsyncExitStack object that can be used to work with context managers

  2. event_broker: the event broker that the store should be using to send events to other components of the system (including other schedulers)

The data store class needs to inherit from DataStore:

from contextlib import AsyncExitStack

from apscheduler.abc import DataStore, EventBroker

class MyCustomDataStore(DataStore):
    _event_broker: EventBroker

    async def start(self, exit_stack: AsyncExitStack, event_broker: EventBroker) -> None:
        # Save the event broker in a member attribute and initialize the store
        self._event_broker = event_broker

    # See the interface class for the rest of the abstract methods

Handling temporary failures

If you plan to make your data store implementation public, it is strongly recommended that you make an effort to ensure that the implementation can tolerate the loss of connectivity to the backing store. The Tenacity library is used for this purpose by the built-in stores to retry operations in case of a disconnection. If you use it to retry operations when exceptions are raised, it is important to only do that in cases of temporary errors, like connectivity loss, and not in cases like authentication failure, missing database and so forth. See the built-in data store implementations and Tenacity documentation for more information on how to pick the exceptions on which to retry the operations.