Build a Distributed Cron Job Scheduler


distributed-systems reliability scalability

System Design Deep Dive

Distributed Cron Job Scheduler

Guaranteeing exactly-once execution across a cluster when clocks drift, nodes crash, and maintenance windows leave a backlog of missed jobs

⏱ 14 min read📐 Advanced🏗️ Distributed-Systems

Unix cron has worked since 1975. For most of its life that was fine - cronjobs sent email digests, rotated logs, and ran batch imports on a single server. Then we moved to container orchestration, auto-scaling groups, and multi-region deployments. Now the same cron expression runs on twelve nodes simultaneously, firing the same job twelve times per minute. The “solved” problem is suddenly very broken.

Think of a distributed cron scheduler like a postal sorting office with multiple shifts. Each shift is responsible for packages arriving during its window. If shifts overlap, two sorters grab the same package and deliver duplicates. If no one shows up for a shift because of a maintenance window, packages pile up and downstream customers wait. The engineering challenge isn’t delivering one package - it’s coordinating hand-offs across unlimited shifts without gaps or double-deliveries at any scale.

The naive multi-node approach fails because cron is stateless by design. Standard cron reads a schedule file and fires jobs independently on each host. Adding a second node doesn’t provide redundancy - it provides duplication. You need external state to coordinate which node fires a job at any given tick. That coordination introduces the classic distributed systems triad: leader election to designate one node as the active scheduler, distributed locking to prevent races during failover, and exactly-once semantics to ensure a job fires exactly once per schedule tick regardless of who’s running.

Scale adds another dimension. A single scheduler node can poll a database of 100,000 jobs every second without breaking a sweat. At 10 million jobs, a full table scan every second destroys your database. You need job partitioning - dividing the job space across multiple scheduler nodes so each node only watches its slice. And when any node goes down, the backlog of missed jobs needs catchup execution that replays them in order without triggering a thundering herd. We need to solve for all of this simultaneously.

Requirements and Constraints

Functional Requirements

  • Register jobs with a cron expression, target URL or queue topic, and metadata payload
  • Fire each job within 500ms of its scheduled time under normal load
  • Guarantee exactly-once execution per schedule tick - no duplicates, no skips under normal operation
  • Detect and replay jobs missed during scheduler downtime (configurable misfire threshold)
  • Support job cancellation, pause, and resume without dropping in-flight executions
  • Expose per-job execution history with status, start time, duration, and result

Non-Functional Requirements

  • Scale to 10 million registered jobs across the scheduler cluster
  • Support 10,000 concurrent job executions per minute peak
  • 99.9% scheduler uptime with automatic leader failover in under 30 seconds
  • Sub-second scheduling jitter under 10,000 jobs/minute load
  • Execution history retained for 90 days
  • Tolerate clock drift up to 200ms between nodes

Constraints and Assumptions

  • Workers that run job logic are separate services - the scheduler handles dispatch only
  • System clocks are NTP-synchronized within 100ms; DB clock is the authoritative time source
  • Jobs missed beyond the misfire threshold (default: 1 hour) are advanced to the next future tick, not replayed
  • Maximum registered jobs per tenant: 100,000

High-Level Architecture

Five components make up the system: an API layer for job registration and management, a Job Store (PostgreSQL) as the single source of truth for all job state, a Scheduler Cluster of nodes that compete for leadership and dispatch due jobs, a Lock Store (etcd or Redis) for distributed coordination, and a Worker Pool that executes the actual job logic.

Distributed cron scheduler architecture overview showing scheduler cluster, job store, task queue, and worker pool

Data flows like this: a client registers a job through the API, which writes a row to the Job Store with status=SCHEDULED and a computed next_run_at timestamp. The active scheduler leader polls the Job Store continuously for jobs where next_run_at <= NOW(), acquires a distributed lock per job to prevent concurrent dispatch, transitions the job to RUNNING, and pushes the job payload onto a task queue. Workers consume the queue, execute the job handler, then report back to the Job Store with COMPLETED or FAILED. The leader also computes the next next_run_at using the cron expression and resets status=SCHEDULED for recurring jobs immediately after dispatch - not after execution completes.

Follower nodes in the scheduler cluster do almost nothing during normal operation. They hold a standby lease and watch for the leader’s lease to expire. When the leader crashes, whichever follower acquires the etcd or DB-backed lease first becomes the new leader and resumes dispatching from exactly where the old leader stopped, using the Job Store as the authoritative handoff point.

Key Insight

The Job Store is not just a persistence layer - it’s the coordination medium. Every scheduler node is stateless; the database holds all state for what has and hasn’t been dispatched. Leader transitions have zero data loss: the new leader simply continues polling the same table from the same point, with no handshake required.

The Job Store

The Job Store is the heart of the scheduler. Every scheduling decision flows through it: when a job is due, whether it’s already running, when it last executed, and what comes next. It must serve high-throughput poll queries with sub-10ms latency while handling concurrent writers without phantom reads.

A single PostgreSQL table with the right indexes handles this at significant scale. The key design choice is using the database’s clock - NOW() in SQL - rather than the application server’s clock for all scheduling decisions. This eliminates clock skew as a correctness concern: even if two scheduler nodes have drifted clocks, both use the DB’s authoritative time when querying for due jobs.

-- Core job registry and execution state
CREATE TABLE scheduled_jobs (
  id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  tenant_id       UUID NOT NULL,
  name            VARCHAR(255) NOT NULL,
  cron_expression VARCHAR(128) NOT NULL,
  handler_url     TEXT NOT NULL,
  payload         JSONB DEFAULT '{}',
  status          VARCHAR(20) NOT NULL DEFAULT 'SCHEDULED',
  partition_key   SMALLINT NOT NULL,
  next_run_at     TIMESTAMPTZ NOT NULL,
  last_run_at     TIMESTAMPTZ,
  last_run_status VARCHAR(20),
  misfire_grace   INTERVAL NOT NULL DEFAULT '1 hour',
  max_retries     SMALLINT NOT NULL DEFAULT 3,
  retry_count     SMALLINT NOT NULL DEFAULT 0,
  lock_token      UUID,
  lock_expires_at TIMESTAMPTZ,
  concurrent_policy VARCHAR(20) NOT NULL DEFAULT 'SKIP',
  created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  CONSTRAINT status_check CHECK (
    status IN ('SCHEDULED','RUNNING','FAILED','PAUSED','CANCELLED')
  ),
  CONSTRAINT concurrent_policy_check CHECK (
    concurrent_policy IN ('SKIP','ALLOW')
  )
);

-- Primary dispatch index: only due SCHEDULED jobs, per partition
CREATE INDEX idx_jobs_dispatch ON scheduled_jobs (partition_key, next_run_at, status)
  WHERE status = 'SCHEDULED';

-- Stuck job recovery: find RUNNING jobs whose lock TTL has expired
CREATE INDEX idx_jobs_stuck ON scheduled_jobs (lock_expires_at)
  WHERE status = 'RUNNING';

-- Per-tenant listing
CREATE INDEX idx_jobs_tenant ON scheduled_jobs (tenant_id, created_at DESC);

-- Scheduler lease table for leader election
CREATE TABLE scheduler_leases (
  lease_name  VARCHAR(64) PRIMARY KEY,
  holder_id   VARCHAR(128) NOT NULL,
  expires_at  TIMESTAMPTZ NOT NULL,
  acquired_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

The partition_key column lets each scheduler node query only its slice of jobs rather than scanning the full table. The lock_token and lock_expires_at columns implement optimistic locking: before dispatching, the scheduler writes a UUID token with a TTL. Only the scheduler that writes the token first proceeds - any racing scheduler sees rowcount=0 from the conditional UPDATE and backs off.

Watch Out

Polling interval is a hidden performance tradeoff. Poll every 100ms for excellent jitter but 600 queries/minute per scheduler node. Poll every second for 1s maximum scheduling delay. At 4 nodes each polling their partition every 500ms, you generate 480 queries/minute against the Job Store - manageable, but every query must hit the dispatch index efficiently. Use EXPLAIN ANALYZE to confirm the index is used; a misconfigured query that falls back to a sequential scan will saturate your primary database at scale.

Leader Election and Distributed Locking

Leader election and distributed locking solve different problems that are easy to conflate. Leader election determines which scheduler node is currently active - only the leader polls and dispatches. Distributed locking prevents double-execution for individual jobs, even during the brief failover window when a new leader starts while the old one hasn’t fully stopped yet.

Leader election uses a lease written to the scheduler_leases table. Each candidate attempts to INSERT ... ON CONFLICT DO UPDATE with a new holder_id and expires_at, but only if the existing row is expired or owned by itself. The node that wins renews every 10 seconds; followers check every 5 seconds. If the leader crashes mid-renewal, the lease expires after 30 seconds and a follower takes over.

import uuid
import time
import psycopg2
from datetime import datetime, timezone, timedelta

LEASE_DURATION_SECS = 30
RENEW_INTERVAL_SECS = 10
NODE_ID = str(uuid.uuid4())

def try_acquire_lease(conn) -> bool:
    """Attempt to become leader by acquiring or renewing the scheduler lease."""
    expiry = datetime.now(timezone.utc) + timedelta(seconds=LEASE_DURATION_SECS)
    with conn.cursor() as cur:
        # Win if no lease exists, the existing lease expired, or we already hold it
        cur.execute("""
            INSERT INTO scheduler_leases (lease_name, holder_id, expires_at)
            VALUES ('primary', %s, %s)
            ON CONFLICT (lease_name) DO UPDATE
              SET holder_id = EXCLUDED.holder_id,
                  expires_at = EXCLUDED.expires_at
            WHERE scheduler_leases.expires_at < NOW()
               OR scheduler_leases.holder_id = %s
            RETURNING holder_id
        """, (NODE_ID, expiry, NODE_ID))
        row = cur.fetchone()
        conn.commit()
    return row is not None and row[0] == NODE_ID

def renew_lease(conn) -> bool:
    """Extend the leader lease. Returns False if the lease was stolen."""
    expiry = datetime.now(timezone.utc) + timedelta(seconds=LEASE_DURATION_SECS)
    with conn.cursor() as cur:
        cur.execute("""
            UPDATE scheduler_leases
            SET expires_at = %s
            WHERE lease_name = 'primary' AND holder_id = %s
        """, (expiry, NODE_ID))
        conn.commit()
        return cur.rowcount == 1

Distributed locking for individual jobs uses a fencing token written into the scheduled_jobs row. Before dispatching a job, the scheduler updates lock_token = new_uuid and lock_expires_at = NOW() + 60s where status = 'SCHEDULED'. If two schedulers race to dispatch the same job (possible during a failover overlap), only one succeeds. The lock token is passed to the worker so it can validate ownership before writing results.

import uuid
from datetime import datetime, timezone, timedelta

def try_lock_job(conn, job_id: str) -> str | None:
    """Acquire a dispatch lock on a specific job. Returns token or None if already locked."""
    token = str(uuid.uuid4())
    lock_expiry = datetime.now(timezone.utc) + timedelta(seconds=60)
    with conn.cursor() as cur:
        cur.execute("""
            UPDATE scheduled_jobs
            SET status = 'RUNNING',
                lock_token = %s,
                lock_expires_at = %s,
                last_run_at = NOW()
            WHERE id = %s
              AND status = 'SCHEDULED'
              AND next_run_at <= NOW()
        """, (token, lock_expiry, job_id))
        conn.commit()
        return token if cur.rowcount == 1 else None
Real World

Kubernetes uses a nearly identical lease-based leader election pattern for its controller-manager and scheduler. The lease is stored as a Lease resource in the coordination.k8s.io API group, and the holder renews it every leaseDurationSeconds. When the holder crashes, followers race to update holderIdentity - the first successful writer becomes the new leader. The equivalent of a fencing token is the resourceVersion field, which prevents a stale writer from overwriting a newer lease state.

The Dispatch Engine

The dispatch engine is the leader’s main loop. It runs continuously while the lease is held, polling the Job Store for due jobs in its assigned partition, acquiring per-job locks, and pushing payloads onto the task queue. The loop must maintain sub-second scheduling jitter while staying gentle enough not to hammer the database.

import asyncio
import os
from typing import List

POLL_INTERVAL_SECS = 0.5
BATCH_SIZE = 100
MY_PARTITION = int(os.environ["PARTITION_ID"])

async def dispatch_loop(conn, task_queue_client):
    """Core scheduler loop: poll due jobs, lock, dispatch, schedule next tick."""
    while True:
        if not renew_lease(conn):
            raise RuntimeError("Lost scheduler lease - stepping down")

        due_jobs = fetch_due_jobs(conn, MY_PARTITION, BATCH_SIZE)
        for job in due_jobs:
            token = try_lock_job(conn, job["id"])
            if token is None:
                continue  # Race lost - another scheduler got there first
            await task_queue_client.push({"job": job, "lock_token": token})
            schedule_next_run(conn, job)  # Set next_run_at immediately after dispatch

        await asyncio.sleep(POLL_INTERVAL_SECS)

def fetch_due_jobs(conn, partition: int, limit: int) -> List[dict]:
    """Fetch jobs due now in this scheduler's partition."""
    with conn.cursor() as cur:
        cur.execute("""
            SELECT id, handler_url, payload, cron_expression,
                   max_retries, retry_count, concurrent_policy
            FROM scheduled_jobs
            WHERE partition_key = %s
              AND status = 'SCHEDULED'
              AND next_run_at <= NOW()
            ORDER BY next_run_at ASC
            LIMIT %s
        """, (partition, limit))
        cols = [d[0] for d in cur.description]
        return [dict(zip(cols, row)) for row in cur.fetchall()]

def schedule_next_run(conn, job: dict):
    """Compute the next fire time and reset the job row for the next tick."""
    from croniter import croniter
    from datetime import datetime, timezone
    cron = croniter(job["cron_expression"], datetime.now(timezone.utc))
    next_run = cron.get_next(datetime)
    with conn.cursor() as cur:
        cur.execute("""
            UPDATE scheduled_jobs
            SET next_run_at = %s,
                status = 'SCHEDULED',
                lock_token = NULL,
                lock_expires_at = NULL
            WHERE id = %s AND status = 'RUNNING'
        """, (next_run, job["id"]))
        conn.commit()
Key Insight

The schedule_next_run call happens immediately after dispatch, not after the worker completes. This means the next tick is already queued before the current execution finishes - preventing a cascading delay where a slow job keeps pushing back every subsequent firing.

Job Partitioning

At 10 million scheduled jobs, a single scheduler can’t watch them all. A full table scan every 500ms would return hundreds of thousands of rows and saturate the database index. Job partitioning divides the job space into N buckets, each owned by a dedicated scheduler node.

Job partitioning across 4 scheduler nodes using 256 virtual partitions

The partition key is computed at job registration time: partition_key = hash(job_id) % NUM_VIRTUAL_PARTITIONS. Using 256 virtual partitions mapped to physical nodes via a coordinator assignment table means adding a new scheduler node only requires redistributing partition ownership - no job rows need to be updated. This is the same over-partitioning strategy Kafka uses with partition-to-broker assignment.

import hashlib

NUM_VIRTUAL_PARTITIONS = 256

def compute_partition_key(job_id: str) -> int:
    """Deterministic partition assignment using a stable hash."""
    digest = hashlib.sha256(job_id.encode()).digest()
    # Use first 4 bytes as uint32, mod by virtual partition count
    bucket = int.from_bytes(digest[:4], "big") % NUM_VIRTUAL_PARTITIONS
    return bucket

def get_physical_node(virtual_partition: int, assignment_table: dict) -> str:
    """Look up which physical scheduler node owns this virtual partition."""
    return assignment_table[virtual_partition]

Each scheduler node queries its assigned virtual partitions in a single query using WHERE partition_key IN (0, 1, 2, ..., 63). When a node goes down, the coordinator reassigns its virtual partitions to surviving nodes without any job data migration.

Watch Out

Static partition assignment by node count creates a rebalancing problem: adding a 5th scheduler to a 4-node cluster requires repartitioning every job row. Over-partition upfront using 256 virtual partitions mapped to physical nodes - adding nodes only requires reassigning virtual partition ownership in the coordinator, touching zero job rows.

Data Model

Every job moves through a deterministic state machine. Implementing exactly-once semantics correctly requires understanding exactly which transitions are valid and enforcing them with conditional SQL updates.

SCHEDULED -> RUNNING      (scheduler acquires lock via conditional UPDATE)
RUNNING -> SCHEDULED      (schedule_next_run resets row after dispatch, or stuck job recovery)
RUNNING -> FAILED         (worker reports failure; retry_count < max_retries)
FAILED -> SCHEDULED       (retry with exponential backoff)
FAILED -> CANCELLED       (max_retries exceeded)
SCHEDULED -> PAUSED       (operator action)
PAUSED -> SCHEDULED       (operator resume)
Job state machine showing all valid state transitions and stuck job recovery

The execution history table is separate from the live jobs table to keep the hot table small and its indexes cache-efficient:

-- Append-only execution log: one row per job fire attempt
CREATE TABLE job_executions (
  id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  job_id        UUID NOT NULL REFERENCES scheduled_jobs(id),
  lock_token    UUID NOT NULL,
  started_at    TIMESTAMPTZ NOT NULL,
  completed_at  TIMESTAMPTZ,
  status        VARCHAR(20) NOT NULL,
  result_code   SMALLINT,
  error_message TEXT,
  worker_id     VARCHAR(128),
  duration_ms   INTEGER,
  CONSTRAINT exec_status_check CHECK (
    status IN ('RUNNING','COMPLETED','FAILED','STALE')
  )
) PARTITION BY RANGE (started_at);

-- Create monthly partitions - drop partitions older than 90 days
CREATE TABLE job_executions_2026_06
  PARTITION OF job_executions
  FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');

CREATE INDEX idx_executions_job_time
  ON job_executions (job_id, started_at DESC);

Workers write to job_executions and must validate the lock_token against the current value in scheduled_jobs. If the token has changed (because stuck job recovery dispatched the job again), the worker marks its execution row as STALE rather than overwriting the newer execution’s state.

Key Algorithms and Protocols

Exactly-Once Semantics

Pure exactly-once delivery is impossible under network partition - you must choose between at-least-once and at-most-once. Cron schedulers use at-least-once dispatch with idempotent deduplication: the scheduler may dispatch a job twice in edge cases (failover overlap), but workers deduplicate using the fencing token.

def complete_job(conn, job_id: str, lock_token: str, result_code: int) -> bool:
    """Worker reports completion. Returns False if token is stale (job was re-dispatched)."""
    with conn.cursor() as cur:
        cur.execute("""
            UPDATE scheduled_jobs
            SET last_run_status = 'COMPLETED',
                lock_token = NULL,
                lock_expires_at = NULL
            WHERE id = %s
              AND lock_token = %s
              AND status = 'RUNNING'
        """, (job_id, lock_token))
        conn.commit()
        if cur.rowcount == 0:
            # Token mismatch: stale worker, discard result
            cur.execute("""
                UPDATE job_executions
                SET status = 'STALE', completed_at = NOW()
                WHERE job_id = %s AND lock_token = %s
            """, (job_id, lock_token))
            conn.commit()
            return False

        cur.execute("""
            UPDATE job_executions
            SET status = 'COMPLETED',
                completed_at = NOW(),
                result_code = %s,
                duration_ms = EXTRACT(EPOCH FROM (NOW() - started_at))::int * 1000
            WHERE job_id = %s AND lock_token = %s
        """, (result_code, job_id, lock_token))
        conn.commit()
    return True
Key Insight

The lock token acts as a generation counter. Each dispatch generates a new UUID. Workers from a prior dispatch can’t overwrite the state of the current dispatch because their stale token never matches the current one. This is the same pattern as Kleppmann’s fencing tokens in distributed locks - the token is not just a mutex, it’s a monotonic generation identifier.

Catchup Execution

When the scheduler restarts after downtime, jobs have accumulated in SCHEDULED state with next_run_at timestamps in the past. Naively firing all of them immediately would hammer the worker pool. Catchup execution distinguishes what to replay from what to skip.

from datetime import datetime, timezone, timedelta
from croniter import croniter

def catchup_missed_jobs(conn, partition: int):
    """
    On startup: replay recently missed jobs, advance stale ones to next future tick.
    Called once before the main dispatch loop starts.
    """
    with conn.cursor() as cur:
        cur.execute("""
            SELECT id, cron_expression, next_run_at, misfire_grace
            FROM scheduled_jobs
            WHERE partition_key = %s
              AND status = 'SCHEDULED'
              AND next_run_at < NOW()
        """, (partition,))
        missed = cur.fetchall()

    now = datetime.now(timezone.utc)
    replay_ids = []
    skip_updates = []

    for job_id, cron_expr, next_run_at, misfire_grace in missed:
        age = now - next_run_at
        if age <= misfire_grace:
            replay_ids.append(job_id)
        else:
            # Job is too old to replay - advance to next future tick
            cron = croniter(cron_expr, now)
            future_next = cron.get_next(datetime)
            skip_updates.append((future_next, job_id))

    if skip_updates:
        with conn.cursor() as cur:
            cur.executemany("""
                UPDATE scheduled_jobs
                SET next_run_at = %s
                WHERE id = %s AND status = 'SCHEDULED'
            """, skip_updates)
            conn.commit()

    # Return replay_ids for dispatch via normal try_lock_job path
    return replay_ids

To avoid a thundering herd when replaying, rate-limit catchup dispatch to 2x normal_dispatch_rate. This ensures you catch up in roughly half the outage duration without doubling peak worker load.

Clock Skew Handling

Clock skew between scheduler nodes is a silent correctness threat. If Node A’s clock is 200ms ahead of Node B’s, A might fire a job 200ms early. The architectural fix: always use the database clock for all scheduling comparisons, never application server time.

-- All scheduling queries use NOW() as evaluated by PostgreSQL, never app-side timestamps
SELECT id FROM scheduled_jobs
WHERE partition_key = $1
  AND status = 'SCHEDULED'
  AND next_run_at <= NOW()
ORDER BY next_run_at ASC
LIMIT 100;

The same principle applies to lease expiry checks - compute expires_at = NOW() + interval '30 seconds' inside the SQL statement, not in application code. An application-computed timestamp passed as a parameter inherits the application server’s potentially drifted clock.

Watch Out

Clock skew becomes dangerous specifically during leader election. If the outgoing leader’s clock is ahead by 30 seconds and it renews a lease that the DB considers already expired (because the DB clock is behind by more than the drift), two nodes believe simultaneously that they hold the lease. Always compute expires_at inside the SQL using NOW() + INTERVAL, never pass a Python datetime.now() value as the expiry timestamp.

Scaling and Performance

The scheduler scales independently on three axes: scheduler nodes (job capacity), worker nodes (execution throughput), and database (query throughput at extreme scale).

Horizontal scaling showing independent scaling of scheduler nodes, task queue, and worker pool

Scheduler node scaling directly increases job capacity. Each added node takes ownership of a slice of virtual partitions, reducing per-node query load proportionally. A 4-node cluster handles 4x more jobs than a single node with no coordination overhead per job - each node simply queries a smaller partition slice.

Worker node scaling affects execution throughput and is fully independent. Workers are stateless consumers of the task queue. Auto-scale workers based on task queue depth using CloudWatch or Kafka consumer lag metrics. Adding workers never requires any scheduler-side changes.

Database scaling - at 10 million jobs and 10,000 dispatches/minute, a single PostgreSQL primary handles this comfortably. Add read replicas for the polling queries if needed (dispatch writes still go to primary). Beyond 100 million jobs, use PostgreSQL declarative table partitioning by partition_key so each partition has its own physical storage and index.

Capacity estimation:
  Registered jobs: 10,000,000
  Dispatch rate:   10,000 jobs/minute = 167 jobs/second

  Worker sizing:
    Avg execution time: 5 seconds
    Workers needed: 167 * 5 = 835 concurrent workers

  Scheduler DB load (4 nodes, 500ms poll interval):
    = 4 * 2 queries/sec = 8 queries/sec to Job Store primary
    Each query scans ~2,500 index rows (10M / 4 partitions)
    P99 query time target: < 5ms

  Execution history storage:
    = 10,000 executions/min * 60 * 24 * 90 days = 1.3B rows/year
    = 260 bytes/row * 1.3B = ~340 GB/year
    Drop monthly partition after 90 days: steady-state ~85 GB
Real World

Apache Airflow uses a nearly identical DB-backed scheduler architecture. The Airflow scheduler polls PostgreSQL or MySQL for DAG tasks in the queued state, uses optimistic locking via max_active_runs counters, and dispatches to Celery or Kubernetes workers via a task queue. The main scaling bottleneck has historically been the single-threaded scheduler loop - which is why Airflow 2.0 introduced parallelized scheduling with multiple concurrent parse and dispatch threads, closely mirroring the partitioned multi-node design described here.

Failure Modes and Recovery

FailureDetectionImpactRecovery
Scheduler leader crashLease expires after 30sAll owned partitions dormant for up to 30sFollower acquires lease; catchup execution handles the backlog on startup
Failover overlap (old + new leader both active)Lock token mismatch on second dispatchPotential double dispatch in 30s windowWorker fencing token deduplication discards the second execution result
Database connection failureException on pollDispatch pauses until DB recoversRetry with exponential backoff; jobs queue as SCHEDULED in DB
Worker crashes mid-executionLock TTL expires (60s default)Job stuck in RUNNING past TTLStuck job recovery resets to SCHEDULED; next dispatch fires fresh
Clock skew exceeds NTP driftTimestamp ordering violations in query resultsJob fires slightly early or lateUse DB NOW() for all comparisons; accept up to 500ms jitter
Misconfigured cron expressionParser exception at dispatchJob never fires; status=FAILEDValidate expression at registration; alert on validation failure

The stuck job recovery loop runs every 60 seconds on each scheduler node for its own partitions:

def recover_stuck_jobs(conn, partition: int):
    """Reset RUNNING jobs whose lock TTL has expired back to SCHEDULED."""
    with conn.cursor() as cur:
        # Retry-eligible: reset to SCHEDULED with incremented retry_count
        cur.execute("""
            UPDATE scheduled_jobs
            SET status = 'SCHEDULED',
                lock_token = NULL,
                lock_expires_at = NULL,
                retry_count = retry_count + 1
            WHERE partition_key = %s
              AND status = 'RUNNING'
              AND lock_expires_at < NOW()
              AND retry_count < max_retries
        """, (partition,))

        # Permanently failed: exhausted retries
        cur.execute("""
            UPDATE scheduled_jobs
            SET status = 'FAILED',
                last_run_status = 'FAILED',
                lock_token = NULL,
                lock_expires_at = NULL
            WHERE partition_key = %s
              AND status = 'RUNNING'
              AND lock_expires_at < NOW()
              AND retry_count >= max_retries
        """, (partition,))
        conn.commit()
Watch Out

The most common operational mistake is setting the lock TTL shorter than the actual p99 job execution time. If a job takes 90 seconds but the lock TTL is 60 seconds, stuck job recovery resets it to SCHEDULED while the original execution is still running - causing a second dispatch and double execution. Set lock_expires_at = NOW() + max(expected_duration * 3, 5 minutes) and alert when actual execution duration exceeds the expected duration by 2x.

Comparison of Approaches

ApproachScheduling LatencyComplexityFailure ModeBest Fit
DB polling (this design)~500msLowDB as single point of failure< 10M jobs, existing PostgreSQL infra
Push-based (Kafka topic per schedule)~50msHighConsumer lag during backpressureHigh-frequency jobs, event-driven arch
Redis Lua atomic scripts~50msMediumLock loss on Redis failover, no WAL durabilityLow durability requirements, high throughput
Off-shelf scheduler (Quartz, Sidekiq-cron)~100msLow (off-shelf)Single-node, vendor limitationsSingle-app scheduling, < 100K jobs
etcd-native distributed lock~20msVery Highetcd cluster issues, operational complexityMulti-region, strong consistency required
Consensus-based (Raft via CockroachDB)~100msVery HighComplex failure modes during partitionFinancial-grade exactly-once, global

The DB-polling approach wins for most teams because it reuses existing PostgreSQL infrastructure, the mental model is simple (it’s just table rows), and it survives failures using the same WAL-based durability you already rely on for your application data. The penalty is ~500ms jitter and incremental load on your primary database. For jobs requiring sub-100ms precision, a Redis-backed approach with Lua atomic scripts delivers better latency at the cost of WAL durability guarantees. Choose based on your SLA: most business logic tolerates 500ms delay; financial settlement jobs may not.

Key Takeaways

  • Leader election solves availability: one scheduler node dispatches at a time, followers stand ready to take over within 30 seconds when the leader’s lease expires.
  • Distributed locking solves races: the conditional UPDATE on lock_token ensures only one scheduler can transition a job to RUNNING, even during the failover overlap window.
  • Fencing tokens solve stale workers: workers validate their lock token before writing results, discarding execution outcomes from pre-failover dispatches that no longer own the job.
  • Database clock authority solves clock skew: all time comparisons happen inside SQL using NOW(), making the database the single authoritative clock regardless of application server drift.
  • Catchup execution solves missed jobs: restart recovery fires recently missed jobs (within grace period) and advances stale ones (beyond grace period) to the next future tick, preventing thundering herds.
  • Job partitioning solves scale: dividing the job space across scheduler nodes reduces per-node DB query load proportionally, scaling to tens of millions of jobs.
  • Over-partitioning solves rebalancing: 256 virtual partitions mapped to physical nodes means adding scheduler capacity only requires reassigning partition ownership, touching zero job rows.

The surprising lesson from this design is that cron - a 50-year-old concept - becomes genuinely hard precisely because of its simplicity requirement: fire this job at this time, exactly once. That “exactly” is what forces the entire distributed coordination stack. Systems that relax to at-least-once (most event-driven pipelines) or at-most-once (metrics aggregation) avoid most of this complexity. Cron’s contract, neither late nor duplicated, is the one that demands the most from its scheduler.

Frequently Asked Questions

Q: Why use PostgreSQL for the job store instead of a dedicated tool like etcd or Redis?

A: PostgreSQL gives you durable storage, ACID transactions, and SQL queries for free. etcd is optimized for small configuration data (under 1MB per value, under 1GB total); it’s not suited for millions of job rows with 90 days of execution history. Redis provides speed but not durability or complex query patterns. A SQL database lets you query missed jobs by time range, filter by tenant, and join against execution history - operations that would require additional infrastructure on Redis or etcd.

Q: Why not use Kubernetes CronJob objects for this?

A: Kubernetes CronJobs handle single-cluster scheduling but don’t solve multi-tenant job registration via API, per-job fencing tokens, or fine-grained execution history. The Kubernetes scheduler will create duplicate Pods if the cluster controller restarts during a job launch - exactly the double-execution scenario this design prevents. For application-level scheduling requiring an API for job registration, per-tenant isolation, and strict execution history, you need an application-layer scheduler.

Q: How do you handle jobs that take longer than their cron interval?

A: Two strategies controlled by the concurrent_policy column: SKIP (default) skips the next scheduled fire if the previous execution is still running; ALLOW lets multiple instances of the same job run simultaneously with a configurable max_concurrent cap. Most business jobs aren’t designed for concurrency and should use SKIP. Check concurrent_policy in try_lock_job and skip dispatch if a RUNNING row exists when policy is SKIP.

Q: How does the scheduler handle DST (Daylight Saving Time) transitions?

A: All timestamps are stored in UTC. Cron expressions that reference local time (for example, “2am every day”) are interpreted at registration time using the job’s specified timezone and converted to a UTC-based next_run_at. The croniter library handles DST transitions correctly when given a tzinfo context. The ambiguous 2am-3am hour during the “fall back” transition should always take the earlier UTC equivalent to avoid missing an execution entirely.

Q: What happens if the scheduled_jobs table becomes a bottleneck at extreme scale?

A: Use PostgreSQL declarative table partitioning by partition_key. Each virtual partition becomes a separate physical child table with its own index, allowing parallel index scans across partitions. Combined with read replicas for polling queries (dispatch writes still go to primary), this scales dispatch throughput to well over 100,000 jobs/minute. Beyond that, consider a two-level architecture: a small “hot” table holding only jobs due in the next 5 minutes (promoted from a larger “cold” table), keeping the hot table small enough to fit in the buffer cache.

Q: How do you handle multi-region deployments where jobs should fire in the region closest to their data?

A: Add a home_region column to scheduled_jobs and a region-aware partition assignment that co-locates each job’s virtual partitions with its home region’s scheduler cluster. Each regional scheduler cluster has its own leader that dispatches only the partitions assigned to its region. The primary Job Store lives in one region with read replicas for other regions; dispatch writes cross regions via synchronous replication. This trades some latency on dispatch writes for regional execution locality. Name the CAP tradeoff explicitly: during a regional partition, remote schedulers may not see lease updates from the primary region and could briefly have two active leaders for the same partition.

Interview Questions

Q: Walk me through how you guarantee a job fires exactly once even when the leader crashes during dispatch.

Expected depth: Explain the two-phase approach: (1) the leader writes a fencing token atomically via conditional UPDATE, transitioning status to RUNNING; (2) the worker validates the token before committing results. Clarify that pure exactly-once is approximated through at-least-once dispatch plus idempotent deduplication at the worker layer. Name the 30-second failover gap and explain why the fencing token prevents double-execution: the second dispatcher generates a new token, and the stale worker’s complete_job call sees rowcount=0 because the token no longer matches.

Q: How would you scale this scheduler to 100 million registered jobs?

Expected depth: Discuss three axes: more scheduler nodes (each owning a partition slice, reducing per-node query rows), PostgreSQL declarative table partitioning by partition_key (each partition has independent index maintenance), and a two-level hot/cold table design (hot table has only jobs due in the next few minutes, dramatically reducing index size on the hot path). Mention that over-partitioning with 256 virtual partitions means no job row migration when adding nodes.

Q: After a 6-hour maintenance window ending at 3am, should the 9pm, 10pm, 11pm, 12am, 1am, 2am, and 3am job ticks all fire? How do you prevent a thundering herd?

Expected depth: Discuss the misfire grace period - jobs within 1 hour of their scheduled time fire on restart, jobs older than 1 hour get advanced to the next future tick. For the batch within grace: rate-limit catchup replay to 2x normal_dispatch_rate so you catch up in half the outage time without doubling worker load. Explain the tradeoff: setting grace period to 6 hours replays all 7 missed ticks which may cause data consistency issues downstream (for example, 6 hourly billing jobs firing back-to-back). Operators should configure grace period based on whether job logic is safe to replay multiple times.

Q: How do you prevent the scheduler from dispatching a job that is currently paused by an operator?

Expected depth: Paused jobs have status = 'PAUSED' in the Job Store. The dispatch query has WHERE status = 'SCHEDULED', so paused jobs are never selected. The tricky case is a job already in RUNNING state when the operator pauses it - the pause request should transition from RUNNING to PAUSED only after the current execution completes (or immediately, marking the in-progress execution as superseded). Discuss the race: if the scheduler dispatches a SCHEDULED job and the operator pauses it in the same instant, the conditional UPDATE in try_lock_job wins because it transitions to RUNNING before the pause can take effect, so the current tick fires and the next tick is paused.

Q: Design the API for registering a job that should fire “at 2pm on the first Monday of every month in UTC.”

Expected depth: Show that the cron expression 0 14 1-7 * 1 doesn’t correctly capture “first Monday” - the correct expression requires a day-of-month and day-of-week combination that most cron parsers handle incorrectly. Discuss using croniter or Quartz cron syntax that supports # notation (0 14 ? * 2#1). Name the API fields: cron_expression (string), timezone (IANA name, default UTC), handler_url (target endpoint), payload (JSON), misfire_grace (duration), concurrent_policy (SKIP or ALLOW). Explain how the scheduler validates the expression at registration time and computes the first next_run_at using croniter.get_next() from NOW().

Premium Content

Unlock the full article along with everything else in the archive — all in one place.

In-depth analysis Expert insights Full archive access
Unlock Full Article