Agent skill

apscheduler

Advanced Python Scheduler - Task scheduling and job queue system

Stars 232
Forks 15

Install this agent skill to your Project

npx add-skill https://github.com/aiskillstore/marketplace/tree/main/skills/bossjones/apscheduler

SKILL.md

APScheduler

APScheduler is a flexible task scheduling and job queue system for Python applications. It supports both synchronous and asynchronous execution with multiple scheduling mechanisms including cron-style, interval-based, and one-off scheduling.

Quick Start

Basic Synchronous Scheduler

python
from datetime import datetime
from apscheduler import Scheduler
from apscheduler.triggers.interval import IntervalTrigger

def tick():
    print(f"Tick: {datetime.now()}")

# Create and start scheduler with memory datastore
with Scheduler() as scheduler:
    scheduler.add_schedule(tick, IntervalTrigger(seconds=1))
    scheduler.run_until_stopped()

Async Scheduler with FastAPI

python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger

def cleanup_task():
    print("Running cleanup task...")

@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler = AsyncScheduler()
    async with scheduler:
        await scheduler.add_schedule(
            cleanup_task,
            IntervalTrigger(hours=1),
            id="cleanup"
        )
        await scheduler.start_in_background()
        yield

app = FastAPI(lifespan=lifespan)

Common Patterns

Schedulers

In-memory scheduler (development):

python
from apscheduler import AsyncScheduler

async def main():
    async with AsyncScheduler() as scheduler:
        # Jobs lost on restart
        await scheduler.add_schedule(my_task, trigger)
        await scheduler.run_until_stopped()

Persistent scheduler (production):

python
from sqlalchemy.ext.asyncio import create_async_engine
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore

async def main():
    engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
    data_store = SQLAlchemyDataStore(engine)

    async with AsyncScheduler(data_store) as scheduler:
        # Jobs survive restarts
        await scheduler.add_schedule(my_task, trigger)
        await scheduler.run_until_stopped()

Distributed scheduler:

python
from apscheduler import AsyncScheduler, SchedulerRole
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker

# Scheduler node - creates jobs from schedules
async def scheduler_node():
    async with AsyncScheduler(
        data_store,
        event_broker,
        role=SchedulerRole.scheduler
    ) as scheduler:
        await scheduler.add_schedule(task, trigger)
        await scheduler.run_until_stopped()

# Worker node - executes jobs only
async def worker_node():
    async with AsyncScheduler(
        data_store,
        event_broker,
        role=SchedulerRole.worker
    ) as scheduler:
        await scheduler.run_until_stopped()

Jobs

Simple function jobs:

python
def send_daily_report():
    generate_report()
    email_report("admin@example.com")

scheduler.add_schedule(
    send_daily_report,
    CronTrigger(hour=9, minute=0)  # 9 AM daily
)

Jobs with arguments:

python
def process_data(source: str, destination: str, batch_size: int):
    # Data processing logic
    pass

scheduler.add_schedule(
    process_data,
    IntervalTrigger(hours=1),
    kwargs={
        'source': 's3://incoming',
        'destination': 's3://processed',
        'batch_size': 1000
    }
)

Async jobs:

python
async def fetch_external_api():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as resp:
            data = await resp.json()
            await save_to_database(data)

scheduler.add_schedule(
    fetch_external_api,
    IntervalTrigger(minutes=5)
)

Triggers

Interval trigger:

python
from apscheduler.triggers.interval import IntervalTrigger

# Every 30 seconds
IntervalTrigger(seconds=30)

# Every 2 hours and 15 minutes
IntervalTrigger(hours=2, minutes=15)

# Every 3 days
IntervalTrigger(days=3)

Cron trigger:

python
from apscheduler.triggers.cron import CronTrigger

# 9:00 AM Monday-Friday
CronTrigger(hour=9, minute=0, day_of_week='mon-fri')

# Every 15 minutes
CronTrigger(minute='*/15')

# Last day of month at midnight
CronTrigger(day='last', hour=0, minute=0)

# Using crontab syntax
CronTrigger.from_crontab('0 9 * * 1-5')  # 9 AM weekdays

Date trigger (one-time):

python
from datetime import datetime, timedelta
from apscheduler.triggers.date import DateTrigger

# 5 minutes from now
run_time = datetime.now() + timedelta(minutes=5)
DateTrigger(run_time=run_time)

# Specific datetime
DateTrigger(run_time=datetime(2024, 12, 31, 23, 59, 59))

Calendar interval:

python
from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger

# First day of every month at 9 AM
CalendarIntervalTrigger(months=1, hour=9, minute=0)

# Every Monday at 10 AM
CalendarIntervalTrigger(weeks=1, day_of_week='mon', hour=10, minute=0)

Persistence

SQLite:

python
engine = create_async_engine("sqlite+aiosqlite:///scheduler.db")
data_store = SQLAlchemyDataStore(engine)

PostgreSQL:

python
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)

Redis (event broker):

python
from apscheduler.eventbrokers.redis import RedisEventBroker

event_broker = RedisEventBroker.from_url("redis://localhost:6379")

Job Management

Get job results:

python
async def main():
    async with AsyncScheduler() as scheduler:
        await scheduler.start_in_background()

        # Add job with result retention
        job_id = await scheduler.add_job(
            calculate_result,
            args=(10, 20),
            result_expiration_time=timedelta(hours=1)
        )

        # Wait for result
        result = await scheduler.get_job_result(job_id, wait=True)
        print(f"Result: {result.return_value}")

Schedule management:

python
# Pause schedule
await scheduler.pause_schedule("my_schedule")

# Resume schedule
await scheduler.unpause_schedule("my_schedule")

# Remove schedule
await scheduler.remove_schedule("my_schedule")

# Get schedule info
schedule = await scheduler.get_schedule("my_schedule")
print(f"Next run: {schedule.next_fire_time}")

Event handling:

python
from apscheduler import JobAdded, JobReleased

def on_job_completed(event: JobReleased):
    if event.outcome == Outcome.success:
        print(f"Job {event.job_id} completed successfully")
    else:
        print(f"Job {event.job_id} failed: {event.exception}")

scheduler.subscribe(on_job_completed, JobReleased)

Configuration

Task defaults:

python
from apscheduler import TaskDefaults

task_defaults = TaskDefaults(
    job_executor='threadpool',
    max_running_jobs=3,
    misfire_grace_time=timedelta(minutes=5)
)

scheduler = AsyncScheduler(task_defaults=task_defaults)

Job execution options:

python
# Configure task behavior
await scheduler.configure_task(
    my_function,
    job_executor='processpool',
    max_running_jobs=5,
    misfire_grace_time=timedelta(minutes=10)
)

# Override per schedule
await scheduler.add_schedule(
    my_function,
    trigger,
    job_executor='threadpool',  # Override default
    coalesce=CoalescePolicy.latest
)

Requirements

bash
# Core package
uv add apscheduler

# Database backends
uv add "apscheduler[postgresql]"  # PostgreSQL
uv add "apscheduler[mongodb]"     # MongoDB
uv add "apscheduler[sqlite]"      # SQLite

# Event brokers
uv add "apscheduler[redis]"       # Redis
uv add "apscheduler[mqtt]"        # MQTT

Dependencies by use case:

  • Basic scheduling: apscheduler
  • PostgreSQL persistence: asyncpg, sqlalchemy
  • Redis distributed: redis
  • MongoDB: motor
  • SQLite: aiosqlite

Best Practices

  1. Use persistent storage for production to survive restarts
  2. Set appropriate misfire_grace_time to handle system delays
  3. Use conflict policies when updating schedules
  4. Subscribe to events for monitoring and debugging
  5. Choose appropriate executors (threadpool for I/O, processpool for CPU)
  6. Implement error handling in job functions
  7. Use unique schedule IDs for management operations
  8. Set result expiration to prevent memory leaks

Didn't find tool you were looking for?

Be as detailed as possible for better results