Build a CI/CD Pipeline Orchestrator


deployment distributed-systems reliability

System Design Deep Dive

CI/CD Pipeline Orchestrator

How do you coordinate 10,000 concurrent build jobs, stream logs in real-time, and guarantee zero job loss when workers crash?

⏱ 14 min read📐 Advanced🏗️ CI-CD

Every time a developer pushes a commit, a chain of events kicks off: clone the repository, install dependencies, run tests across dozens of shards, build container images, push artifacts, and finally deploy. That chain is a CI/CD pipeline - and for a company with hundreds of engineers, thousands of those chains are starting simultaneously. At 10,000 concurrent jobs, the coordination problem dwarfs the execution problem. The hard part is not running a shell script on a machine. The hard part is deciding which machine runs it, what happens when that machine dies mid-run, and how the engineer watching their terminal sees log output appear line by line in near-real-time.

Think of a CI/CD orchestrator like a construction dispatch center. Thousands of job orders arrive every minute. Each job has a priority, a set of dependencies on prior jobs, and a requirement for a specific type of equipment. Dispatchers assign each order to a crew. If a crew’s van breaks down mid-job, the dispatcher must detect the failure, reassign the partially-done work, and ensure nothing is billed twice. The dispatcher does not swing a hammer - it keeps everything organized so the crews can work without stepping on each other.

The naive approach - a single process that polls a database table, fires off shell commands, and tails log files - collapses quickly. At 100 concurrent jobs, a single coordinator becomes a bottleneck. At 1,000, the log-tailing I/O alone saturates the network interface. At 10,000, a single coordinator crash drops all in-flight state. Distributed systems theory gives us a vocabulary for these failure modes: the coordinator needs to be stateless, job ownership needs to be enforced with leases, and log delivery needs a separate fanout path from the job execution path.

Three architectural decisions define every serious CI/CD orchestrator. First, the job queue must provide at-least-once delivery with visibility timeouts, so a job is not considered done until a worker explicitly marks it complete. Second, the worker pool must report heartbeats and have its lease revoked on silence, ensuring jobs return to the queue when a worker dies. Third, log streaming must be decoupled from job execution - workers write to a side channel, and clients subscribe to that channel independently, so neither slow log consumers nor network hiccups can block job progress.

Requirements and Constraints

Functional Requirements

  • Accept pipeline definitions as YAML or JSON, describing a DAG (directed acyclic graph) of stages and jobs
  • Queue and dispatch up to 10,000 concurrent build and test jobs across a heterogeneous worker pool
  • Stream job logs to clients in real-time with per-line timestamps and ordering guarantees
  • Store build artifacts (binaries, container images, test reports) with content-addressable references
  • Support job retries with configurable backoff, max attempts, and per-step idempotency keys
  • Expose REST and WebSocket APIs for job submission, status queries, log tailing, and artifact download
  • Cancel in-flight jobs and propagate cancellation signals to running worker processes

Non-Functional Requirements

  • Throughput: 10,000 concurrent jobs; peak intake of 5,000 new jobs per minute
  • Latency: job dispatch within 500ms of submission for standard-priority jobs; log lines visible within 200ms of emission
  • Durability: zero job loss on coordinator or worker crash; at-least-once execution per job
  • Availability: 99.9% uptime for the dispatch and queuing layer; graceful degradation if artifact storage is temporarily unavailable
  • Scalability: worker pool auto-scales from 100 to 5,000 nodes within 90 seconds based on queue depth
  • Observability: per-job execution traces, queue depth metrics, worker utilization dashboards

Constraints

  • Workers are untrusted - they run arbitrary user code in isolated containers or VMs
  • Build artifacts can be large (up to 10 GB per job); they must not flow through the coordinator
  • Log volume can reach 50 MB per job and 500 GB per hour at peak; logs must be queryable after job completion
  • The system must support both ephemeral cloud workers and persistent on-premise runners

High-Level Architecture

The system decomposes into six layers, each with a clear ownership boundary.

The API Gateway accepts webhook events from Git providers and REST calls from developers. It validates pipeline YAML, resolves the DAG structure, and enqueues individual jobs. It is stateless and horizontally scalable behind a load balancer.

The Job Queue is the durable backbone of the system. Each job becomes a message with a visibility timeout. Workers acquire jobs by dequeuing messages. If the visibility timeout expires before the worker sends a heartbeat extension, the message becomes visible again and another worker picks it up.

The Worker Pool consists of ephemeral nodes that poll the job queue, execute the assigned job in an isolated environment, and publish results. Workers never share state with each other - all coordination flows through the queue and the state store.

The State Store (a PostgreSQL cluster with a Redis cache) tracks every job’s lifecycle: PENDING, QUEUED, ASSIGNED, RUNNING, SUCCESS, FAILED, RETRYING, and CANCELLED. The state store is the single source of truth for pipeline progress.

The Log Aggregator receives structured log lines from workers over a side channel (typically a lightweight gRPC stream or a Kafka topic per job). It persists logs to object storage and provides a streaming read API so clients can tail in real-time without the coordinator being in the hot path.

The Artifact Store is a content-addressable object storage layer (S3-compatible). Workers upload artifacts directly using pre-signed URLs issued by the API Gateway, so large binaries never flow through any coordinator service.

CI/CD Pipeline Orchestrator architecture overview

Key Insight

The coordinator is a traffic director, not a data conduit. Logs, artifacts, and source code must all flow on side channels. If any large payload passes through the coordinator, it becomes the bottleneck as concurrency scales.

Component Deep Dives

Job Queue

The job queue is the most critical component for durability. We model it on the visibility timeout pattern made famous by Amazon SQS. When a worker polls the queue, the message is not deleted - it becomes invisible for a configurable period (say, 5 minutes). The worker must extend that timeout by sending heartbeats every 30 seconds. If heartbeats stop, the message reappears and another worker can claim it.

This is like a library’s checkout system. Borrowing a book does not destroy the library’s record - it just marks the book as checked out. If you never return it, the library eventually calls you, and if you still do not respond, the record resets so someone else can borrow it.

We implement priority lanes using three separate queues: CRITICAL (for manual deploys and hotfix branches), HIGH (for main branch merges), and NORMAL (for feature branch builds). Workers check queues in priority order with a short exponential backoff when all queues are empty.

import time
import redis

r = redis.Redis()

QUEUES = ["queue:critical", "queue:high", "queue:normal"]
LEASE_TTL = 300  # seconds
HEARTBEAT_INTERVAL = 30

def poll_job(worker_id: str) -> dict | None:
    for queue in QUEUES:
        job_id = r.lpop(queue)
        if job_id:
            lease_key = f"lease:{job_id.decode()}"
            r.set(lease_key, worker_id, ex=LEASE_TTL)
            return {"job_id": job_id.decode(), "queue": queue}
    return None

def extend_lease(job_id: str, worker_id: str) -> bool:
    lease_key = f"lease:{job_id}"
    current = r.get(lease_key)
    if current and current.decode() == worker_id:
        r.expire(lease_key, LEASE_TTL)
        return True
    return False  # lease stolen or expired

def complete_job(job_id: str, worker_id: str, success: bool) -> bool:
    lease_key = f"lease:{job_id}"
    current = r.get(lease_key)
    if not current or current.decode() != worker_id:
        return False  # someone else owns this job now
    pipe = r.pipeline()
    pipe.delete(lease_key)
    pipe.hset(f"job:{job_id}", "status", "SUCCESS" if success else "FAILED")
    pipe.execute()
    return True
Job queue internals with priority lanes and worker heartbeats

Watch Out

Using a single Redis list as your job queue is fine up to roughly 50,000 enqueues per second per shard. Beyond that, you need partitioned queues - either by tenant or by job type. Do not wait until you hit the limit to design partitioning in, because retrofitting it under live traffic is extremely painful.

Worker Pool

Workers are stateless executors. Each worker process runs in a loop: poll the queue, acquire a lease, spin up an isolated environment (Docker container, Firecracker microVM, or a plain subprocess depending on your trust model), execute the job, stream logs to the aggregator, upload artifacts to object storage, and mark the job complete.

The lifecycle is analogous to a restaurant kitchen. A ticket comes in (job dispatch), the chef (worker) starts cooking (execution), calls out each item as it is ready (log streaming), and marks the ticket done only when the plate leaves the pass (job completion). If the chef gets sick mid-service, the ticket goes back to the wheel for another chef - the food that was already plated is discarded and the job re-runs.

Worker auto-scaling uses a simple metric: queue depth divided by average job duration. This gives estimated processing capacity needed in workers. We publish this metric to CloudWatch or Datadog, and an autoscaler adjusts the worker fleet every 60 seconds.

package worker

import (
    "context"
    "log"
    "time"
)

type Worker struct {
    ID        string
    QueueClient QueueClient
    JobRunner   JobRunner
    LogStream   LogStreamClient
}

func (w *Worker) Run(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
            job, err := w.QueueClient.Poll(ctx)
            if err != nil || job == nil {
                time.Sleep(500 * time.Millisecond)
                continue
            }
            w.execute(ctx, job)
        }
    }
}

func (w *Worker) execute(ctx context.Context, job *Job) {
    heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx)
    defer cancelHeartbeat()

    go w.heartbeat(heartbeatCtx, job.ID)

    stream := w.LogStream.OpenStream(job.ID)
    defer stream.Close()

    result := w.JobRunner.Run(job, stream)

    if !w.QueueClient.Complete(job.ID, w.ID, result.Success) {
        log.Printf("lease expired for job %s, result discarded", job.ID)
    }
}

func (w *Worker) heartbeat(ctx context.Context, jobID string) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := w.QueueClient.ExtendLease(jobID, w.ID); err != nil {
                log.Printf("failed to extend lease for job %s: %v", jobID, err)
                return
            }
        }
    }
}

Real World

GitHub Actions uses a similar pull-based model where self-hosted runners long-poll an HTTPS endpoint. This means runners work behind firewalls without inbound ports, since all connections are outbound from runner to coordinator. The lease mechanism is implemented as a job token that expires if not refreshed.

Log Streaming

Log streaming must solve three tensions simultaneously: low latency for the developer watching a live build, high throughput for jobs that emit megabytes per second, and durable storage for post-mortem queries weeks later.

Think of it like a live sports broadcast with a DVR. The broadcaster sends frames over satellite in real-time (live streaming). The DVR records each frame locally (persistence). Viewers can join mid-broadcast and see what they missed from the DVR buffer (catch-up). Nobody waits for the other - the broadcast does not pause because some viewers are slow.

Workers write log lines to a Kafka topic named logs.{job_id}. Each line is a JSON record with fields seq (monotonic sequence number), ts (Unix timestamp in milliseconds), stream (stdout or stderr), and text. The sequence number allows clients to detect gaps and request retransmission.

A Log Aggregator service consumes from Kafka and simultaneously: (1) buffers the last 10,000 lines per job in Redis for live WebSocket tailing, (2) writes completed chunks to S3 as NDJSON files for long-term storage. When a client opens a WebSocket connection to tail a job, the aggregator replays buffered history and then subscribes to new lines from the Redis stream.

import json
import asyncio
import aioredis

async def stream_logs(job_id: str, websocket):
    r = await aioredis.create_redis_pool("redis://localhost")
    stream_key = f"logs:{job_id}"

    # replay historical lines
    history = await r.xrange(stream_key, "-", "+")
    for _, fields in history:
        await websocket.send(fields[b"data"].decode())

    # tail live lines
    last_id = history[-1][0] if history else "$"
    while True:
        entries = await r.xread({stream_key: last_id}, block=5000, count=100)
        if entries:
            for _, messages in entries:
                for msg_id, fields in messages:
                    await websocket.send(fields[b"data"].decode())
                    last_id = msg_id
        # check if job finished
        status = await r.hget(f"job:{job_id}", "status")
        if status and status.decode() in ("SUCCESS", "FAILED", "CANCELLED"):
            break

    r.close()
    await r.wait_closed()

Watch Out

Do not buffer all log lines in memory on the coordinator. A single job generating 50 MB of logs, multiplied by 10,000 concurrent jobs, is 500 GB of in-memory state. Use Redis Streams with a capped length (MAXLEN ~) or a Kafka topic with a short retention period, and always flush to object storage before expiring the hot buffer.

Artifact Storage

Artifact storage leverages the principle that the coordinator should never touch the data plane. When a job needs to upload an artifact (a compiled binary, a container image layer, a test coverage report), the worker requests a pre-signed URL from the API Gateway. The API Gateway generates a short-lived S3 pre-signed PUT URL, returns it to the worker, and the worker uploads directly to S3. The coordinator only stores the resulting artifact metadata (S3 key, content hash, size, MIME type) in the state store.

This is exactly how a notary works. The notary verifies your identity and issues a notarized form. You then go to the government office yourself to file the form. The notary does not hand-carry your paperwork to every government department - they just issue the authorization.

Content-addressable storage (using SHA-256 of the file content as the key) gives us automatic deduplication: if two jobs produce the same binary, only one copy exists in S3. This is especially valuable for Docker layer caches, where base layer binaries are identical across thousands of builds.

import boto3
import hashlib

s3 = boto3.client("s3")

def issue_upload_url(job_id: str, artifact_name: str, size_bytes: int) -> dict:
    key = f"artifacts/{job_id}/{artifact_name}"
    url = s3.generate_presigned_url(
        "put_object",
        Params={
            "Bucket": "ci-artifacts",
            "Key": key,
            "ContentType": "application/octet-stream",
        },
        ExpiresIn=3600,
    )
    return {"url": url, "key": key}

def register_artifact(job_id: str, key: str, sha256: str, size: int, db):
    db.execute(
        """
        INSERT INTO artifacts (job_id, s3_key, content_hash, size_bytes, created_at)
        VALUES (%s, %s, %s, %s, NOW())
        ON CONFLICT (content_hash) DO NOTHING
        """,
        (job_id, key, sha256, size),
    )

Real World

CircleCI and Buildkite both issue pre-signed URLs for artifact uploads. This pattern lets them avoid running their own high-bandwidth storage proxies, and it means artifact throughput scales automatically with S3’s capacity rather than with the number of coordinator nodes.

Data Model

-- Pipeline definitions (DAG structure)
CREATE TABLE pipelines (
  id           UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  repo_id      UUID NOT NULL,
  commit_sha   CHAR(40) NOT NULL,
  branch       TEXT NOT NULL,
  definition   JSONB NOT NULL,  -- raw DAG YAML parsed to JSON
  status       TEXT NOT NULL DEFAULT 'PENDING',
  created_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at   TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Individual jobs within a pipeline
CREATE TABLE jobs (
  id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  pipeline_id     UUID NOT NULL REFERENCES pipelines(id),
  stage           TEXT NOT NULL,
  name            TEXT NOT NULL,
  status          TEXT NOT NULL DEFAULT 'PENDING',
  priority        SMALLINT NOT NULL DEFAULT 5,  -- 1 (critical) to 10 (low)
  assigned_worker TEXT,
  lease_expires   TIMESTAMPTZ,
  attempt         SMALLINT NOT NULL DEFAULT 0,
  max_attempts    SMALLINT NOT NULL DEFAULT 3,
  started_at      TIMESTAMPTZ,
  finished_at     TIMESTAMPTZ,
  exit_code       SMALLINT,
  created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (created_at);

-- Monthly partitions for jobs (auto-created by maintenance job)
CREATE TABLE jobs_2026_06 PARTITION OF jobs
  FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');

-- Artifacts produced by jobs
CREATE TABLE artifacts (
  id           UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  job_id       UUID NOT NULL REFERENCES jobs(id),
  name         TEXT NOT NULL,
  s3_key       TEXT NOT NULL,
  content_hash CHAR(64) NOT NULL,  -- SHA-256
  size_bytes   BIGINT NOT NULL,
  mime_type    TEXT,
  created_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  UNIQUE (content_hash)
);

-- Worker registry (updated by heartbeats)
CREATE TABLE workers (
  id             TEXT PRIMARY KEY,
  pool           TEXT NOT NULL,
  status         TEXT NOT NULL DEFAULT 'IDLE',  -- IDLE, BUSY, DRAINING, OFFLINE
  current_job_id UUID REFERENCES jobs(id),
  last_heartbeat TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  capacity       JSONB,  -- {"cpu": 4, "memory_gb": 16, "arch": "amd64"}
  registered_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Indexes for hot query paths
CREATE INDEX idx_jobs_status_priority ON jobs (status, priority DESC) WHERE status IN ('QUEUED', 'RUNNING');
CREATE INDEX idx_jobs_pipeline ON jobs (pipeline_id);
CREATE INDEX idx_jobs_lease_expires ON jobs (lease_expires) WHERE status = 'RUNNING';
CREATE INDEX idx_workers_status ON workers (status) WHERE status = 'IDLE';

Partitioning strategy: The jobs table is range-partitioned by created_at with monthly partitions. This keeps hot data in a small number of pages, allows old partitions to be archived cheaply (just DETACH PARTITION and move to cold storage), and prevents bloat from accumulating in a single table that would eventually hit vacuum performance issues.

For high-throughput writes, we shard the Redis lease store by job_id % N across N Redis nodes. The queue partitions (Kafka topics) are partitioned by (tenant_id, priority) so that one noisy tenant cannot starve another’s jobs.

CI/CD job state lifecycle

Key Algorithms and Protocols

Pipeline DAG Traversal

A pipeline is a DAG where nodes are jobs and edges represent “must complete before” dependencies. We use topological sort (Kahn’s algorithm) to determine which jobs are ready to execute at each step. A job becomes QUEUED when all its upstream dependencies have status SUCCESS.

from collections import deque, defaultdict
from typing import List, Dict

def get_ready_jobs(pipeline_id: str, db) -> List[str]:
    """
    Returns job IDs that have all dependencies satisfied
    and are currently in PENDING state.
    """
    rows = db.execute(
        """
        SELECT j.id, j.status,
               array_agg(dep.upstream_job_id) FILTER (WHERE dep.upstream_job_id IS NOT NULL) AS deps
        FROM jobs j
        LEFT JOIN job_dependencies dep ON dep.downstream_job_id = j.id
        WHERE j.pipeline_id = %s
        GROUP BY j.id, j.status
        """,
        (pipeline_id,)
    ).fetchall()

    job_status = {row["id"]: row["status"] for row in rows}
    job_deps = {row["id"]: (row["deps"] or []) for row in rows}

    ready = []
    for job_id, status in job_status.items():
        if status != "PENDING":
            continue
        deps = job_deps[job_id]
        if all(job_status.get(dep) == "SUCCESS" for dep in deps):
            ready.append(job_id)
    return ready

def advance_pipeline(pipeline_id: str, db, queue_client):
    ready_jobs = get_ready_jobs(pipeline_id, db)
    for job_id in ready_jobs:
        db.execute(
            "UPDATE jobs SET status = 'QUEUED' WHERE id = %s AND status = 'PENDING'",
            (job_id,)
        )
        queue_client.enqueue(job_id)

Key Insight

The DAG advance function must be idempotent. A job should only transition from PENDING to QUEUED once, protected by the WHERE status = 'PENDING' clause. Without this guard, a race between two coordinator replicas can enqueue the same job twice, causing duplicate execution.

At-Least-Once Delivery Protocol

The at-least-once delivery guarantee is enforced through three mechanisms working together: lease expiry, a reaper process, and idempotent job execution.

package reaper

import (
    "context"
    "database/sql"
    "log"
    "time"
)

// Reaper runs every 60 seconds and reclaims jobs
// whose worker leases have expired.
type Reaper struct {
    DB          *sql.DB
    QueueClient QueueClient
}

func (r *Reaper) Run(ctx context.Context) {
    ticker := time.NewTicker(60 * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            r.reclaim(ctx)
        }
    }
}

func (r *Reaper) reclaim(ctx context.Context) {
    rows, err := r.DB.QueryContext(ctx, `
        UPDATE jobs
        SET status = CASE
            WHEN attempt >= max_attempts THEN 'FAILED'
            ELSE 'QUEUED'
        END,
        attempt = attempt + 1,
        assigned_worker = NULL,
        lease_expires = NULL
        WHERE status = 'RUNNING'
        AND lease_expires < NOW()
        RETURNING id, status, attempt
    `)
    if err != nil {
        log.Printf("reaper query failed: %v", err)
        return
    }
    defer rows.Close()

    for rows.Next() {
        var id, status string
        var attempt int
        if err := rows.Scan(&id, &status, &attempt); err != nil {
            continue
        }
        if status == "QUEUED" {
            r.QueueClient.Enqueue(id)
            log.Printf("reclaimed job %s (attempt %d)", id, attempt)
        } else {
            log.Printf("job %s exhausted retries, marked FAILED", id)
        }
    }
}

Key Insight

The reaper is the safety net, not the primary mechanism. The primary mechanism is the worker’s heartbeat extending its own lease. The reaper only fires when something has gone wrong - network partition, OOM kill, VM preemption. Tune the reaper interval to be at least 2x the heartbeat interval to avoid false reclaims on slow networks.

Retry Logic

We implement exponential backoff with jitter for job retries. A job that fails due to a transient error (network timeout, dependency service unavailable) should not immediately flood the queue with retry attempts.

import random
import math

def compute_retry_delay(attempt: int, base_delay_s: float = 30.0, max_delay_s: float = 600.0) -> float:
    """
    Full jitter exponential backoff.
    attempt=1 -> 0 to 30s
    attempt=2 -> 0 to 60s
    attempt=3 -> 0 to 120s
    Capped at max_delay_s.
    """
    cap = min(base_delay_s * math.pow(2, attempt - 1), max_delay_s)
    return random.uniform(0, cap)

def schedule_retry(job_id: str, attempt: int, db, queue_client):
    delay = compute_retry_delay(attempt)
    db.execute(
        "UPDATE jobs SET status = 'RETRYING', retry_after = NOW() + %s * INTERVAL '1 second' WHERE id = %s",
        (delay, job_id)
    )
    # A separate scheduler process reads jobs WHERE status='RETRYING' AND retry_after <= NOW()
    # and re-enqueues them. This avoids busy-waiting in the queue itself.

Scaling and Performance

Horizontal scaling in a CI/CD orchestrator follows three independent axes. Queue scaling: as job intake grows, we add Kafka partitions (or Redis cluster shards) to increase throughput. Worker scaling: as queue depth increases, the autoscaler provisions more worker nodes. Coordinator scaling: as API request volume grows, we add stateless API Gateway replicas behind a load balancer.

# Capacity Estimation

Target: 10,000 concurrent jobs

Assumptions:
  - Average job duration: 5 minutes (300 seconds)
  - Average CPU per job: 2 vCPU
  - Average memory per job: 4 GB
  - Log throughput per job: 100 KB/s
  - Artifact size per job: 500 MB average

Worker fleet sizing:
  - 10,000 jobs x 2 vCPU = 20,000 vCPU needed
  - c5.4xlarge (16 vCPU, 32 GB) can run ~6 jobs each
  - Fleet size: 10,000 / 6 = ~1,667 nodes
  - With 20% headroom: 2,000 worker nodes

Queue throughput:
  - 5,000 new jobs/min = 83 jobs/second
  - Single Kafka partition handles ~10,000 messages/sec
  - 1 partition is sufficient; add more for fault isolation

Log aggregation:
  - 10,000 jobs x 100 KB/s = 1 GB/s peak log ingestion
  - Kafka cluster: 10 brokers x 200 MB/s = 2 GB/s capacity
  - Redis Streams hot buffer: 10,000 jobs x 10 MB = 100 GB
  - S3 cold storage: 10,000 jobs x 50 MB = 500 GB/hour

Database (PostgreSQL):
  - 10,000 rows in 'running' state
  - Heartbeat writes: 10,000 / 30s = ~333 writes/sec (well within range)
  - Use connection pooling (PgBouncer) to handle API fanout
CI/CD pipeline scaling architecture

Real World

GitLab CI scales its runner fleet using Kubernetes autoscaling with a custom metric server that reads queue depth directly from the CI database. The key tuning parameter is not scale-up speed - it is scale-down aggressiveness. Leaving idle workers running costs money; draining them too fast causes job starvation during traffic spikes. GitLab uses a 10-minute idle timeout before draining a runner.

Failure Modes and Recovery

FailureDetectionImpactRecovery
Worker crash mid-jobLease expiry after heartbeat silenceJob stuck in RUNNING state until reaper firesReaper reclaims job after TTL, re-enqueues for retry
Coordinator node crashLoad balancer health check (HTTP 200 probe fails)In-flight API requests dropped; no new job dispatches during failoverStateless coordinators; LB routes to healthy replica within 10s
Kafka broker failureKafka controller election; consumer lag spikeLog lines lost for affected partitions during leader electionKafka replication factor >= 3; producer retries with acks=all
Database primary failurePatroni/pgpool leader electionAll writes blocked during failover (30-60 seconds)Read-only degraded mode (status queries still work); queue-based writes buffer in Kafka
S3 outageArtifact upload HTTP 503Workers cannot upload artifacts; jobs succeed but artifacts are missingJob marked SUCCESS but artifact status = PENDING; retry upload when S3 recovers
Worker stuck in infinite loopJob wall-clock timeout exceededLease never expires naturally; resources consumed indefinitelyExplicit max_duration per job; coordinator forcibly revokes lease after hard limit

Watch Out

The most dangerous failure mode is a job that does not crash but does not make progress either - it is “stuck running.” Lease heartbeats keep extending the timeout, but no useful work happens. Always enforce a hard wall-clock timeout per job, independent of the heartbeat mechanism. The heartbeat proves the worker is alive; a timeout proves the job is making reasonable progress.

Comparison of Approaches

DimensionPush-Based DispatchPull-Based DispatchHybrid (Push+Pull)
LatencyVery low - coordinator pushes immediatelySlightly higher - worker polls on intervalLow - coordinator signals workers to poll
Fault toleranceCoordinator must track which worker received each jobWorkers own their own lease; coordinator is statelessComplex but avoids both failure modes
Worker discoveryCoordinator maintains live worker registryWorkers register themselves; coordinator need not track capacityRegistry used for routing hints only
BackpressureCoordinator can overwhelm slow workers with pushesWorkers naturally throttle by polling speedRequires explicit flow control signals
Best forReal-time bidding, gaming, low-latency workloadsCI/CD, batch processing, durable job queuesHigh-frequency trading, mixed workload systems

Pull-based dispatch is the right choice for CI/CD. Workers behind firewalls can connect to the coordinator without requiring inbound ports. The coordinator does not need to maintain a live connection map to thousands of workers. And critically, if the coordinator restarts, workers simply resume polling with no coordination overhead. Push-based dispatch is faster but requires the coordinator to be stateful about which jobs are “in transit” - that statefulness is exactly what makes crash recovery hard.

Key Takeaways

  • Lease-based ownership is the foundational primitive: a job belongs to a worker only for as long as the worker proves it is alive. Anything else leads to either duplicate execution or lost jobs.
  • Separate the data plane from the control plane: logs, artifacts, and source code must flow on dedicated side channels. The coordinator’s job is to orchestrate, not to move bytes.
  • The reaper is the last safety net: design your heartbeat intervals and lease TTLs so that the reaper almost never fires in steady state. If it fires frequently, your heartbeat interval is too long.
  • Idempotency at every job boundary: job execution may happen more than once due to at-least-once delivery. Design each job step to be safe to re-run - use content-addressable artifact keys, idempotent database upserts, and deterministic test commands.
  • Pull-based dispatch wins for durability: workers pull jobs rather than having jobs pushed to them. This moves failure handling responsibility to the worker and queue, where it is easier to reason about.
  • Prioritized queues prevent starvation of critical paths: a hotfix deploy must never queue behind 5,000 routine feature branch builds. Always separate priority lanes in the queue.
  • DAG advancement must be idempotent: the function that promotes pending jobs to queued must be safe to call multiple times. Use database-level CAS (compare-and-swap) transitions to prevent race conditions between coordinator replicas.
  • Capacity estimation drives architecture: at 10,000 concurrent jobs, you need roughly 2,000 worker nodes, 1 GB/s of log ingestion capacity, and 500 GB/hour of artifact storage. Plan these numbers before picking technologies, not after.

Most engineers underestimate how much complexity lives in the seam between the queue and the executor. Getting a job to run is trivial - subprocess.run(...). Guaranteeing exactly the right number of executions, surfacing logs in the right order, and recovering cleanly from every failure mode is where the real design work lives.

Frequently Asked Questions

Q: Why not just use a database table as a job queue? A: A single database table works at low scale but has two failure modes at high scale. First, polling frequency creates lock contention: if 5,000 workers all SELECT FOR UPDATE SKIP LOCKED on the same table every second, you will see significant lock overhead. Second, a database is not designed for high-fanout pub/sub semantics needed for log streaming. Use a purpose-built queue (Kafka, SQS, Redis Streams) for job dispatch and reserve the database for state tracking.

Q: Why not use Kubernetes Jobs directly instead of a custom orchestrator? A: Kubernetes Jobs are great for simple use cases, but they lack several features CI/CD demands at scale: fine-grained priority queuing, cross-job dependency DAGs, real-time log streaming with historical replay, and custom retry policies with per-step idempotency. Most mature CI/CD systems use Kubernetes as a compute substrate for running containers, but build a custom orchestration layer on top of it for the coordination logic.

Q: How do you prevent one noisy tenant from consuming all worker capacity? A: Tenant isolation at the queue level. Each tenant gets a separate queue partition (or a separate queue entirely in high-tier plans). Worker pools are partitioned by tenant tier: paid teams get dedicated worker pools with guaranteed minimum capacity, while free-tier jobs share a “best effort” pool. A global scheduler periodically rebalances worker allocation based on queue depth per tenant.

Q: What happens when a job produces non-deterministic output across retries? A: Artifact content hashes differ between retries. We store artifacts per (job_id, attempt) rather than overwriting. The final pipeline result uses the artifact from the last successful attempt. This gives you a full audit trail of what each retry produced, which is invaluable for debugging flaky tests or non-deterministic build tools.

Q: Why use Redis for the lease store instead of keeping everything in PostgreSQL? A: Lease updates are extremely write-heavy: 10,000 workers each writing a heartbeat every 30 seconds is 333 writes per second sustained, with bursts at job assignment and completion. PostgreSQL handles this fine, but the lease TTL check (finding expired leases) requires a range scan on lease_expires. Redis EXPIRE handles TTLs natively at O(1) without table scans, and the atomic SET NX EX command provides compare-and-swap semantics for lease acquisition without application-level locking.

Q: How do you handle a job that requires 32 GB of RAM on a fleet of 16 GB workers? A: Resource tagging. Jobs declare their resource requirements in the pipeline YAML (cpu: 4, memory: 32Gi, arch: arm64). Workers advertise their capacity on registration. The dispatcher only assigns a job to a worker whose available capacity satisfies the job’s requirements. Workers with specialized hardware (GPUs, high-memory instances) are kept in separate pools with their own queue partitions.

Interview Questions

Q: Walk me through what happens from git push to a test result appearing in the UI. Expected depth: The candidate should cover the webhook call, pipeline YAML parsing, DAG construction and initial job enqueuing, worker polling and lease acquisition, job execution with log streaming to Kafka, WebSocket fanout to the UI, artifact upload via pre-signed URL, job completion with state transition, and DAG advancement triggering the next stage’s jobs.

Q: A worker crashes exactly as it is calling complete_job. The job was actually successful. What happens? Expected depth: The candidate should recognize this as the classic at-least-once delivery edge case. The lease will expire, the reaper will re-enqueue the job, and the job will run again. This is why job steps must be idempotent - a duplicate run should produce the same artifact with the same content hash, and the second run’s artifact upload will hit the ON CONFLICT DO NOTHING clause. The UI may briefly show the job as running a second time before the duplicate completes.

Q: How would you add support for 1 million concurrent jobs without redesigning the system? Expected depth: Horizontal scaling of each layer independently. Queue: add Kafka partitions sharded by tenant. Workers: auto-scaling groups in multiple regions. Coordinator: stateless replicas behind a global load balancer. Database: partition the jobs table by both time and tenant, with a read replica per region. The key bottleneck to address first is the reaper query - a full table scan for expired leases on 1 million rows needs an index on (status, lease_expires) and should run per-partition to avoid locking the entire table.

Q: How would you implement pipeline cancellation that propagates to all in-flight jobs? Expected depth: A cancellation signal must reach workers that are actively executing. The coordinator marks the pipeline and all its RUNNING or QUEUED jobs as CANCELLED in the database. For queued jobs, they are simply removed from the queue (or ignored on dequeue if the status check reveals CANCELLED). For running jobs, the coordinator publishes a cancellation message to a cancellations Kafka topic keyed by job_id. Each worker subscribes to this topic and checks it during its execution loop. The worker’s job runner sends a SIGTERM to the container process and waits up to 30 seconds for graceful shutdown before sending SIGKILL.

Q: Design the schema and query for the “build history” page - showing the last 50 pipeline runs for a repository, with status and duration. Expected depth: The candidate should note that this query joins pipelines (filtered by repo_id, ordered by created_at DESC LIMIT 50) with an aggregate over jobs to compute overall pipeline status and duration. The index needed is (repo_id, created_at DESC) on pipelines. They should also note that computing pipeline status from job statuses is expensive if done on read - it is better to maintain a denormalized pipeline.status field updated on each job state transition, making the build history query a simple indexed scan with no joins.

Premium Content

Unlock the full article along with everything else in the archive — all in one place.

In-depth analysis Expert insights Full archive access
Unlock Full Article