Build a CI/CD Pipeline Orchestrator
deployment distributed-systems reliability
System Design Deep Dive
CI/CD Pipeline Orchestrator
How do you coordinate 10,000 concurrent build jobs, stream logs in real-time, and guarantee zero job loss when workers crash?
Every time a developer pushes a commit, a chain of events kicks off: clone the repository, install dependencies, run tests across dozens of shards, build container images, push artifacts, and finally deploy. That chain is a CI/CD pipeline - and for a company with hundreds of engineers, thousands of those chains are starting simultaneously. At 10,000 concurrent jobs, the coordination problem dwarfs the execution problem. The hard part is not running a shell script on a machine. The hard part is deciding which machine runs it, what happens when that machine dies mid-run, and how the engineer watching their terminal sees log output appear line by line in near-real-time.
Think of a CI/CD orchestrator like a construction dispatch center. Thousands of job orders arrive every minute. Each job has a priority, a set of dependencies on prior jobs, and a requirement for a specific type of equipment. Dispatchers assign each order to a crew. If a crew’s van breaks down mid-job, the dispatcher must detect the failure, reassign the partially-done work, and ensure nothing is billed twice. The dispatcher does not swing a hammer - it keeps everything organized so the crews can work without stepping on each other.
The naive approach - a single process that polls a database table, fires off shell commands, and tails log files - collapses quickly. At 100 concurrent jobs, a single coordinator becomes a bottleneck. At 1,000, the log-tailing I/O alone saturates the network interface. At 10,000, a single coordinator crash drops all in-flight state. Distributed systems theory gives us a vocabulary for these failure modes: the coordinator needs to be stateless, job ownership needs to be enforced with leases, and log delivery needs a separate fanout path from the job execution path.
Three architectural decisions define every serious CI/CD orchestrator. First, the job queue must provide at-least-once delivery with visibility timeouts, so a job is not considered done until a worker explicitly marks it complete. Second, the worker pool must report heartbeats and have its lease revoked on silence, ensuring jobs return to the queue when a worker dies. Third, log streaming must be decoupled from job execution - workers write to a side channel, and clients subscribe to that channel independently, so neither slow log consumers nor network hiccups can block job progress.
Requirements and Constraints
Functional Requirements
- Accept pipeline definitions as YAML or JSON, describing a DAG (directed acyclic graph) of stages and jobs
- Queue and dispatch up to 10,000 concurrent build and test jobs across a heterogeneous worker pool
- Stream job logs to clients in real-time with per-line timestamps and ordering guarantees
- Store build artifacts (binaries, container images, test reports) with content-addressable references
- Support job retries with configurable backoff, max attempts, and per-step idempotency keys
- Expose REST and WebSocket APIs for job submission, status queries, log tailing, and artifact download
- Cancel in-flight jobs and propagate cancellation signals to running worker processes
Non-Functional Requirements
- Throughput: 10,000 concurrent jobs; peak intake of 5,000 new jobs per minute
- Latency: job dispatch within 500ms of submission for standard-priority jobs; log lines visible within 200ms of emission
- Durability: zero job loss on coordinator or worker crash; at-least-once execution per job
- Availability: 99.9% uptime for the dispatch and queuing layer; graceful degradation if artifact storage is temporarily unavailable
- Scalability: worker pool auto-scales from 100 to 5,000 nodes within 90 seconds based on queue depth
- Observability: per-job execution traces, queue depth metrics, worker utilization dashboards
Constraints
- Workers are untrusted - they run arbitrary user code in isolated containers or VMs
- Build artifacts can be large (up to 10 GB per job); they must not flow through the coordinator
- Log volume can reach 50 MB per job and 500 GB per hour at peak; logs must be queryable after job completion
- The system must support both ephemeral cloud workers and persistent on-premise runners
High-Level Architecture
The system decomposes into six layers, each with a clear ownership boundary.
The API Gateway accepts webhook events from Git providers and REST calls from developers. It validates pipeline YAML, resolves the DAG structure, and enqueues individual jobs. It is stateless and horizontally scalable behind a load balancer.
The Job Queue is the durable backbone of the system. Each job becomes a message with a visibility timeout. Workers acquire jobs by dequeuing messages. If the visibility timeout expires before the worker sends a heartbeat extension, the message becomes visible again and another worker picks it up.
The Worker Pool consists of ephemeral nodes that poll the job queue, execute the assigned job in an isolated environment, and publish results. Workers never share state with each other - all coordination flows through the queue and the state store.
The State Store (a PostgreSQL cluster with a Redis cache) tracks every job’s lifecycle: PENDING, QUEUED, ASSIGNED, RUNNING, SUCCESS, FAILED, RETRYING, and CANCELLED. The state store is the single source of truth for pipeline progress.
The Log Aggregator receives structured log lines from workers over a side channel (typically a lightweight gRPC stream or a Kafka topic per job). It persists logs to object storage and provides a streaming read API so clients can tail in real-time without the coordinator being in the hot path.
The Artifact Store is a content-addressable object storage layer (S3-compatible). Workers upload artifacts directly using pre-signed URLs issued by the API Gateway, so large binaries never flow through any coordinator service.
Key Insight
The coordinator is a traffic director, not a data conduit. Logs, artifacts, and source code must all flow on side channels. If any large payload passes through the coordinator, it becomes the bottleneck as concurrency scales.
Component Deep Dives
Job Queue
The job queue is the most critical component for durability. We model it on the visibility timeout pattern made famous by Amazon SQS. When a worker polls the queue, the message is not deleted - it becomes invisible for a configurable period (say, 5 minutes). The worker must extend that timeout by sending heartbeats every 30 seconds. If heartbeats stop, the message reappears and another worker can claim it.
This is like a library’s checkout system. Borrowing a book does not destroy the library’s record - it just marks the book as checked out. If you never return it, the library eventually calls you, and if you still do not respond, the record resets so someone else can borrow it.
We implement priority lanes using three separate queues: CRITICAL (for manual deploys and hotfix branches), HIGH (for main branch merges), and NORMAL (for feature branch builds). Workers check queues in priority order with a short exponential backoff when all queues are empty.
import time
import redis
r = redis.Redis()
QUEUES = ["queue:critical", "queue:high", "queue:normal"]
LEASE_TTL = 300 # seconds
HEARTBEAT_INTERVAL = 30
def poll_job(worker_id: str) -> dict | None:
for queue in QUEUES:
job_id = r.lpop(queue)
if job_id:
lease_key = f"lease:{job_id.decode()}"
r.set(lease_key, worker_id, ex=LEASE_TTL)
return {"job_id": job_id.decode(), "queue": queue}
return None
def extend_lease(job_id: str, worker_id: str) -> bool:
lease_key = f"lease:{job_id}"
current = r.get(lease_key)
if current and current.decode() == worker_id:
r.expire(lease_key, LEASE_TTL)
return True
return False # lease stolen or expired
def complete_job(job_id: str, worker_id: str, success: bool) -> bool:
lease_key = f"lease:{job_id}"
current = r.get(lease_key)
if not current or current.decode() != worker_id:
return False # someone else owns this job now
pipe = r.pipeline()
pipe.delete(lease_key)
pipe.hset(f"job:{job_id}", "status", "SUCCESS" if success else "FAILED")
pipe.execute()
return True
Watch Out
Using a single Redis list as your job queue is fine up to roughly 50,000 enqueues per second per shard. Beyond that, you need partitioned queues - either by tenant or by job type. Do not wait until you hit the limit to design partitioning in, because retrofitting it under live traffic is extremely painful.
Worker Pool
Workers are stateless executors. Each worker process runs in a loop: poll the queue, acquire a lease, spin up an isolated environment (Docker container, Firecracker microVM, or a plain subprocess depending on your trust model), execute the job, stream logs to the aggregator, upload artifacts to object storage, and mark the job complete.
The lifecycle is analogous to a restaurant kitchen. A ticket comes in (job dispatch), the chef (worker) starts cooking (execution), calls out each item as it is ready (log streaming), and marks the ticket done only when the plate leaves the pass (job completion). If the chef gets sick mid-service, the ticket goes back to the wheel for another chef - the food that was already plated is discarded and the job re-runs.
Worker auto-scaling uses a simple metric: queue depth divided by average job duration. This gives estimated processing capacity needed in workers. We publish this metric to CloudWatch or Datadog, and an autoscaler adjusts the worker fleet every 60 seconds.
package worker
import (
"context"
"log"
"time"
)
type Worker struct {
ID string
QueueClient QueueClient
JobRunner JobRunner
LogStream LogStreamClient
}
func (w *Worker) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
job, err := w.QueueClient.Poll(ctx)
if err != nil || job == nil {
time.Sleep(500 * time.Millisecond)
continue
}
w.execute(ctx, job)
}
}
}
func (w *Worker) execute(ctx context.Context, job *Job) {
heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx)
defer cancelHeartbeat()
go w.heartbeat(heartbeatCtx, job.ID)
stream := w.LogStream.OpenStream(job.ID)
defer stream.Close()
result := w.JobRunner.Run(job, stream)
if !w.QueueClient.Complete(job.ID, w.ID, result.Success) {
log.Printf("lease expired for job %s, result discarded", job.ID)
}
}
func (w *Worker) heartbeat(ctx context.Context, jobID string) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := w.QueueClient.ExtendLease(jobID, w.ID); err != nil {
log.Printf("failed to extend lease for job %s: %v", jobID, err)
return
}
}
}
}
Real World
GitHub Actions uses a similar pull-based model where self-hosted runners long-poll an HTTPS endpoint. This means runners work behind firewalls without inbound ports, since all connections are outbound from runner to coordinator. The lease mechanism is implemented as a job token that expires if not refreshed.
Log Streaming
Log streaming must solve three tensions simultaneously: low latency for the developer watching a live build, high throughput for jobs that emit megabytes per second, and durable storage for post-mortem queries weeks later.
Think of it like a live sports broadcast with a DVR. The broadcaster sends frames over satellite in real-time (live streaming). The DVR records each frame locally (persistence). Viewers can join mid-broadcast and see what they missed from the DVR buffer (catch-up). Nobody waits for the other - the broadcast does not pause because some viewers are slow.
Workers write log lines to a Kafka topic named logs.{job_id}. Each line is a JSON record with fields seq (monotonic sequence number), ts (Unix timestamp in milliseconds), stream (stdout or stderr), and text. The sequence number allows clients to detect gaps and request retransmission.
A Log Aggregator service consumes from Kafka and simultaneously: (1) buffers the last 10,000 lines per job in Redis for live WebSocket tailing, (2) writes completed chunks to S3 as NDJSON files for long-term storage. When a client opens a WebSocket connection to tail a job, the aggregator replays buffered history and then subscribes to new lines from the Redis stream.
import json
import asyncio
import aioredis
async def stream_logs(job_id: str, websocket):
r = await aioredis.create_redis_pool("redis://localhost")
stream_key = f"logs:{job_id}"
# replay historical lines
history = await r.xrange(stream_key, "-", "+")
for _, fields in history:
await websocket.send(fields[b"data"].decode())
# tail live lines
last_id = history[-1][0] if history else "$"
while True:
entries = await r.xread({stream_key: last_id}, block=5000, count=100)
if entries:
for _, messages in entries:
for msg_id, fields in messages:
await websocket.send(fields[b"data"].decode())
last_id = msg_id
# check if job finished
status = await r.hget(f"job:{job_id}", "status")
if status and status.decode() in ("SUCCESS", "FAILED", "CANCELLED"):
break
r.close()
await r.wait_closed()
Watch Out
Do not buffer all log lines in memory on the coordinator. A single job generating 50 MB of logs, multiplied by 10,000 concurrent jobs, is 500 GB of in-memory state. Use Redis Streams with a capped length (MAXLEN ~) or a Kafka topic with a short retention period, and always flush to object storage before expiring the hot buffer.
Artifact Storage
Artifact storage leverages the principle that the coordinator should never touch the data plane. When a job needs to upload an artifact (a compiled binary, a container image layer, a test coverage report), the worker requests a pre-signed URL from the API Gateway. The API Gateway generates a short-lived S3 pre-signed PUT URL, returns it to the worker, and the worker uploads directly to S3. The coordinator only stores the resulting artifact metadata (S3 key, content hash, size, MIME type) in the state store.
This is exactly how a notary works. The notary verifies your identity and issues a notarized form. You then go to the government office yourself to file the form. The notary does not hand-carry your paperwork to every government department - they just issue the authorization.
Content-addressable storage (using SHA-256 of the file content as the key) gives us automatic deduplication: if two jobs produce the same binary, only one copy exists in S3. This is especially valuable for Docker layer caches, where base layer binaries are identical across thousands of builds.
import boto3
import hashlib
s3 = boto3.client("s3")
def issue_upload_url(job_id: str, artifact_name: str, size_bytes: int) -> dict:
key = f"artifacts/{job_id}/{artifact_name}"
url = s3.generate_presigned_url(
"put_object",
Params={
"Bucket": "ci-artifacts",
"Key": key,
"ContentType": "application/octet-stream",
},
ExpiresIn=3600,
)
return {"url": url, "key": key}
def register_artifact(job_id: str, key: str, sha256: str, size: int, db):
db.execute(
"""
INSERT INTO artifacts (job_id, s3_key, content_hash, size_bytes, created_at)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (content_hash) DO NOTHING
""",
(job_id, key, sha256, size),
)
Real World
CircleCI and Buildkite both issue pre-signed URLs for artifact uploads. This pattern lets them avoid running their own high-bandwidth storage proxies, and it means artifact throughput scales automatically with S3’s capacity rather than with the number of coordinator nodes.
Data Model
-- Pipeline definitions (DAG structure)
CREATE TABLE pipelines (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
repo_id UUID NOT NULL,
commit_sha CHAR(40) NOT NULL,
branch TEXT NOT NULL,
definition JSONB NOT NULL, -- raw DAG YAML parsed to JSON
status TEXT NOT NULL DEFAULT 'PENDING',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Individual jobs within a pipeline
CREATE TABLE jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
pipeline_id UUID NOT NULL REFERENCES pipelines(id),
stage TEXT NOT NULL,
name TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'PENDING',
priority SMALLINT NOT NULL DEFAULT 5, -- 1 (critical) to 10 (low)
assigned_worker TEXT,
lease_expires TIMESTAMPTZ,
attempt SMALLINT NOT NULL DEFAULT 0,
max_attempts SMALLINT NOT NULL DEFAULT 3,
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
exit_code SMALLINT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (created_at);
-- Monthly partitions for jobs (auto-created by maintenance job)
CREATE TABLE jobs_2026_06 PARTITION OF jobs
FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');
-- Artifacts produced by jobs
CREATE TABLE artifacts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_id UUID NOT NULL REFERENCES jobs(id),
name TEXT NOT NULL,
s3_key TEXT NOT NULL,
content_hash CHAR(64) NOT NULL, -- SHA-256
size_bytes BIGINT NOT NULL,
mime_type TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (content_hash)
);
-- Worker registry (updated by heartbeats)
CREATE TABLE workers (
id TEXT PRIMARY KEY,
pool TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'IDLE', -- IDLE, BUSY, DRAINING, OFFLINE
current_job_id UUID REFERENCES jobs(id),
last_heartbeat TIMESTAMPTZ NOT NULL DEFAULT NOW(),
capacity JSONB, -- {"cpu": 4, "memory_gb": 16, "arch": "amd64"}
registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Indexes for hot query paths
CREATE INDEX idx_jobs_status_priority ON jobs (status, priority DESC) WHERE status IN ('QUEUED', 'RUNNING');
CREATE INDEX idx_jobs_pipeline ON jobs (pipeline_id);
CREATE INDEX idx_jobs_lease_expires ON jobs (lease_expires) WHERE status = 'RUNNING';
CREATE INDEX idx_workers_status ON workers (status) WHERE status = 'IDLE';
Partitioning strategy: The jobs table is range-partitioned by created_at with monthly partitions. This keeps hot data in a small number of pages, allows old partitions to be archived cheaply (just DETACH PARTITION and move to cold storage), and prevents bloat from accumulating in a single table that would eventually hit vacuum performance issues.
For high-throughput writes, we shard the Redis lease store by job_id % N across N Redis nodes. The queue partitions (Kafka topics) are partitioned by (tenant_id, priority) so that one noisy tenant cannot starve another’s jobs.
Key Algorithms and Protocols
Pipeline DAG Traversal
A pipeline is a DAG where nodes are jobs and edges represent “must complete before” dependencies. We use topological sort (Kahn’s algorithm) to determine which jobs are ready to execute at each step. A job becomes QUEUED when all its upstream dependencies have status SUCCESS.
from collections import deque, defaultdict
from typing import List, Dict
def get_ready_jobs(pipeline_id: str, db) -> List[str]:
"""
Returns job IDs that have all dependencies satisfied
and are currently in PENDING state.
"""
rows = db.execute(
"""
SELECT j.id, j.status,
array_agg(dep.upstream_job_id) FILTER (WHERE dep.upstream_job_id IS NOT NULL) AS deps
FROM jobs j
LEFT JOIN job_dependencies dep ON dep.downstream_job_id = j.id
WHERE j.pipeline_id = %s
GROUP BY j.id, j.status
""",
(pipeline_id,)
).fetchall()
job_status = {row["id"]: row["status"] for row in rows}
job_deps = {row["id"]: (row["deps"] or []) for row in rows}
ready = []
for job_id, status in job_status.items():
if status != "PENDING":
continue
deps = job_deps[job_id]
if all(job_status.get(dep) == "SUCCESS" for dep in deps):
ready.append(job_id)
return ready
def advance_pipeline(pipeline_id: str, db, queue_client):
ready_jobs = get_ready_jobs(pipeline_id, db)
for job_id in ready_jobs:
db.execute(
"UPDATE jobs SET status = 'QUEUED' WHERE id = %s AND status = 'PENDING'",
(job_id,)
)
queue_client.enqueue(job_id)
Key Insight
The DAG advance function must be idempotent. A job should only transition from PENDING to QUEUED once, protected by the WHERE status = 'PENDING' clause. Without this guard, a race between two coordinator replicas can enqueue the same job twice, causing duplicate execution.
At-Least-Once Delivery Protocol
The at-least-once delivery guarantee is enforced through three mechanisms working together: lease expiry, a reaper process, and idempotent job execution.
package reaper
import (
"context"
"database/sql"
"log"
"time"
)
// Reaper runs every 60 seconds and reclaims jobs
// whose worker leases have expired.
type Reaper struct {
DB *sql.DB
QueueClient QueueClient
}
func (r *Reaper) Run(ctx context.Context) {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
r.reclaim(ctx)
}
}
}
func (r *Reaper) reclaim(ctx context.Context) {
rows, err := r.DB.QueryContext(ctx, `
UPDATE jobs
SET status = CASE
WHEN attempt >= max_attempts THEN 'FAILED'
ELSE 'QUEUED'
END,
attempt = attempt + 1,
assigned_worker = NULL,
lease_expires = NULL
WHERE status = 'RUNNING'
AND lease_expires < NOW()
RETURNING id, status, attempt
`)
if err != nil {
log.Printf("reaper query failed: %v", err)
return
}
defer rows.Close()
for rows.Next() {
var id, status string
var attempt int
if err := rows.Scan(&id, &status, &attempt); err != nil {
continue
}
if status == "QUEUED" {
r.QueueClient.Enqueue(id)
log.Printf("reclaimed job %s (attempt %d)", id, attempt)
} else {
log.Printf("job %s exhausted retries, marked FAILED", id)
}
}
}
Key Insight
The reaper is the safety net, not the primary mechanism. The primary mechanism is the worker’s heartbeat extending its own lease. The reaper only fires when something has gone wrong - network partition, OOM kill, VM preemption. Tune the reaper interval to be at least 2x the heartbeat interval to avoid false reclaims on slow networks.
Retry Logic
We implement exponential backoff with jitter for job retries. A job that fails due to a transient error (network timeout, dependency service unavailable) should not immediately flood the queue with retry attempts.
import random
import math
def compute_retry_delay(attempt: int, base_delay_s: float = 30.0, max_delay_s: float = 600.0) -> float:
"""
Full jitter exponential backoff.
attempt=1 -> 0 to 30s
attempt=2 -> 0 to 60s
attempt=3 -> 0 to 120s
Capped at max_delay_s.
"""
cap = min(base_delay_s * math.pow(2, attempt - 1), max_delay_s)
return random.uniform(0, cap)
def schedule_retry(job_id: str, attempt: int, db, queue_client):
delay = compute_retry_delay(attempt)
db.execute(
"UPDATE jobs SET status = 'RETRYING', retry_after = NOW() + %s * INTERVAL '1 second' WHERE id = %s",
(delay, job_id)
)
# A separate scheduler process reads jobs WHERE status='RETRYING' AND retry_after <= NOW()
# and re-enqueues them. This avoids busy-waiting in the queue itself.
Scaling and Performance
Horizontal scaling in a CI/CD orchestrator follows three independent axes. Queue scaling: as job intake grows, we add Kafka partitions (or Redis cluster shards) to increase throughput. Worker scaling: as queue depth increases, the autoscaler provisions more worker nodes. Coordinator scaling: as API request volume grows, we add stateless API Gateway replicas behind a load balancer.
# Capacity Estimation
Target: 10,000 concurrent jobs
Assumptions:
- Average job duration: 5 minutes (300 seconds)
- Average CPU per job: 2 vCPU
- Average memory per job: 4 GB
- Log throughput per job: 100 KB/s
- Artifact size per job: 500 MB average
Worker fleet sizing:
- 10,000 jobs x 2 vCPU = 20,000 vCPU needed
- c5.4xlarge (16 vCPU, 32 GB) can run ~6 jobs each
- Fleet size: 10,000 / 6 = ~1,667 nodes
- With 20% headroom: 2,000 worker nodes
Queue throughput:
- 5,000 new jobs/min = 83 jobs/second
- Single Kafka partition handles ~10,000 messages/sec
- 1 partition is sufficient; add more for fault isolation
Log aggregation:
- 10,000 jobs x 100 KB/s = 1 GB/s peak log ingestion
- Kafka cluster: 10 brokers x 200 MB/s = 2 GB/s capacity
- Redis Streams hot buffer: 10,000 jobs x 10 MB = 100 GB
- S3 cold storage: 10,000 jobs x 50 MB = 500 GB/hour
Database (PostgreSQL):
- 10,000 rows in 'running' state
- Heartbeat writes: 10,000 / 30s = ~333 writes/sec (well within range)
- Use connection pooling (PgBouncer) to handle API fanout
Real World
GitLab CI scales its runner fleet using Kubernetes autoscaling with a custom metric server that reads queue depth directly from the CI database. The key tuning parameter is not scale-up speed - it is scale-down aggressiveness. Leaving idle workers running costs money; draining them too fast causes job starvation during traffic spikes. GitLab uses a 10-minute idle timeout before draining a runner.
Failure Modes and Recovery
| Failure | Detection | Impact | Recovery |
|---|---|---|---|
| Worker crash mid-job | Lease expiry after heartbeat silence | Job stuck in RUNNING state until reaper fires | Reaper reclaims job after TTL, re-enqueues for retry |
| Coordinator node crash | Load balancer health check (HTTP 200 probe fails) | In-flight API requests dropped; no new job dispatches during failover | Stateless coordinators; LB routes to healthy replica within 10s |
| Kafka broker failure | Kafka controller election; consumer lag spike | Log lines lost for affected partitions during leader election | Kafka replication factor >= 3; producer retries with acks=all |
| Database primary failure | Patroni/pgpool leader election | All writes blocked during failover (30-60 seconds) | Read-only degraded mode (status queries still work); queue-based writes buffer in Kafka |
| S3 outage | Artifact upload HTTP 503 | Workers cannot upload artifacts; jobs succeed but artifacts are missing | Job marked SUCCESS but artifact status = PENDING; retry upload when S3 recovers |
| Worker stuck in infinite loop | Job wall-clock timeout exceeded | Lease never expires naturally; resources consumed indefinitely | Explicit max_duration per job; coordinator forcibly revokes lease after hard limit |
Watch Out
The most dangerous failure mode is a job that does not crash but does not make progress either - it is “stuck running.” Lease heartbeats keep extending the timeout, but no useful work happens. Always enforce a hard wall-clock timeout per job, independent of the heartbeat mechanism. The heartbeat proves the worker is alive; a timeout proves the job is making reasonable progress.
Comparison of Approaches
| Dimension | Push-Based Dispatch | Pull-Based Dispatch | Hybrid (Push+Pull) |
|---|---|---|---|
| Latency | Very low - coordinator pushes immediately | Slightly higher - worker polls on interval | Low - coordinator signals workers to poll |
| Fault tolerance | Coordinator must track which worker received each job | Workers own their own lease; coordinator is stateless | Complex but avoids both failure modes |
| Worker discovery | Coordinator maintains live worker registry | Workers register themselves; coordinator need not track capacity | Registry used for routing hints only |
| Backpressure | Coordinator can overwhelm slow workers with pushes | Workers naturally throttle by polling speed | Requires explicit flow control signals |
| Best for | Real-time bidding, gaming, low-latency workloads | CI/CD, batch processing, durable job queues | High-frequency trading, mixed workload systems |
Pull-based dispatch is the right choice for CI/CD. Workers behind firewalls can connect to the coordinator without requiring inbound ports. The coordinator does not need to maintain a live connection map to thousands of workers. And critically, if the coordinator restarts, workers simply resume polling with no coordination overhead. Push-based dispatch is faster but requires the coordinator to be stateful about which jobs are “in transit” - that statefulness is exactly what makes crash recovery hard.
Key Takeaways
- Lease-based ownership is the foundational primitive: a job belongs to a worker only for as long as the worker proves it is alive. Anything else leads to either duplicate execution or lost jobs.
- Separate the data plane from the control plane: logs, artifacts, and source code must flow on dedicated side channels. The coordinator’s job is to orchestrate, not to move bytes.
- The reaper is the last safety net: design your heartbeat intervals and lease TTLs so that the reaper almost never fires in steady state. If it fires frequently, your heartbeat interval is too long.
- Idempotency at every job boundary: job execution may happen more than once due to at-least-once delivery. Design each job step to be safe to re-run - use content-addressable artifact keys, idempotent database upserts, and deterministic test commands.
- Pull-based dispatch wins for durability: workers pull jobs rather than having jobs pushed to them. This moves failure handling responsibility to the worker and queue, where it is easier to reason about.
- Prioritized queues prevent starvation of critical paths: a hotfix deploy must never queue behind 5,000 routine feature branch builds. Always separate priority lanes in the queue.
- DAG advancement must be idempotent: the function that promotes pending jobs to queued must be safe to call multiple times. Use database-level CAS (compare-and-swap) transitions to prevent race conditions between coordinator replicas.
- Capacity estimation drives architecture: at 10,000 concurrent jobs, you need roughly 2,000 worker nodes, 1 GB/s of log ingestion capacity, and 500 GB/hour of artifact storage. Plan these numbers before picking technologies, not after.
Most engineers underestimate how much complexity lives in the seam between the queue and the executor. Getting a job to run is trivial - subprocess.run(...). Guaranteeing exactly the right number of executions, surfacing logs in the right order, and recovering cleanly from every failure mode is where the real design work lives.
Frequently Asked Questions
Q: Why not just use a database table as a job queue?
A: A single database table works at low scale but has two failure modes at high scale. First, polling frequency creates lock contention: if 5,000 workers all SELECT FOR UPDATE SKIP LOCKED on the same table every second, you will see significant lock overhead. Second, a database is not designed for high-fanout pub/sub semantics needed for log streaming. Use a purpose-built queue (Kafka, SQS, Redis Streams) for job dispatch and reserve the database for state tracking.
Q: Why not use Kubernetes Jobs directly instead of a custom orchestrator? A: Kubernetes Jobs are great for simple use cases, but they lack several features CI/CD demands at scale: fine-grained priority queuing, cross-job dependency DAGs, real-time log streaming with historical replay, and custom retry policies with per-step idempotency. Most mature CI/CD systems use Kubernetes as a compute substrate for running containers, but build a custom orchestration layer on top of it for the coordination logic.
Q: How do you prevent one noisy tenant from consuming all worker capacity? A: Tenant isolation at the queue level. Each tenant gets a separate queue partition (or a separate queue entirely in high-tier plans). Worker pools are partitioned by tenant tier: paid teams get dedicated worker pools with guaranteed minimum capacity, while free-tier jobs share a “best effort” pool. A global scheduler periodically rebalances worker allocation based on queue depth per tenant.
Q: What happens when a job produces non-deterministic output across retries?
A: Artifact content hashes differ between retries. We store artifacts per (job_id, attempt) rather than overwriting. The final pipeline result uses the artifact from the last successful attempt. This gives you a full audit trail of what each retry produced, which is invaluable for debugging flaky tests or non-deterministic build tools.
Q: Why use Redis for the lease store instead of keeping everything in PostgreSQL?
A: Lease updates are extremely write-heavy: 10,000 workers each writing a heartbeat every 30 seconds is 333 writes per second sustained, with bursts at job assignment and completion. PostgreSQL handles this fine, but the lease TTL check (finding expired leases) requires a range scan on lease_expires. Redis EXPIRE handles TTLs natively at O(1) without table scans, and the atomic SET NX EX command provides compare-and-swap semantics for lease acquisition without application-level locking.
Q: How do you handle a job that requires 32 GB of RAM on a fleet of 16 GB workers?
A: Resource tagging. Jobs declare their resource requirements in the pipeline YAML (cpu: 4, memory: 32Gi, arch: arm64). Workers advertise their capacity on registration. The dispatcher only assigns a job to a worker whose available capacity satisfies the job’s requirements. Workers with specialized hardware (GPUs, high-memory instances) are kept in separate pools with their own queue partitions.
Interview Questions
Q: Walk me through what happens from git push to a test result appearing in the UI.
Expected depth: The candidate should cover the webhook call, pipeline YAML parsing, DAG construction and initial job enqueuing, worker polling and lease acquisition, job execution with log streaming to Kafka, WebSocket fanout to the UI, artifact upload via pre-signed URL, job completion with state transition, and DAG advancement triggering the next stage’s jobs.
Q: A worker crashes exactly as it is calling complete_job. The job was actually successful. What happens?
Expected depth: The candidate should recognize this as the classic at-least-once delivery edge case. The lease will expire, the reaper will re-enqueue the job, and the job will run again. This is why job steps must be idempotent - a duplicate run should produce the same artifact with the same content hash, and the second run’s artifact upload will hit the ON CONFLICT DO NOTHING clause. The UI may briefly show the job as running a second time before the duplicate completes.
Q: How would you add support for 1 million concurrent jobs without redesigning the system?
Expected depth: Horizontal scaling of each layer independently. Queue: add Kafka partitions sharded by tenant. Workers: auto-scaling groups in multiple regions. Coordinator: stateless replicas behind a global load balancer. Database: partition the jobs table by both time and tenant, with a read replica per region. The key bottleneck to address first is the reaper query - a full table scan for expired leases on 1 million rows needs an index on (status, lease_expires) and should run per-partition to avoid locking the entire table.
Q: How would you implement pipeline cancellation that propagates to all in-flight jobs?
Expected depth: A cancellation signal must reach workers that are actively executing. The coordinator marks the pipeline and all its RUNNING or QUEUED jobs as CANCELLED in the database. For queued jobs, they are simply removed from the queue (or ignored on dequeue if the status check reveals CANCELLED). For running jobs, the coordinator publishes a cancellation message to a cancellations Kafka topic keyed by job_id. Each worker subscribes to this topic and checks it during its execution loop. The worker’s job runner sends a SIGTERM to the container process and waits up to 30 seconds for graceful shutdown before sending SIGKILL.
Q: Design the schema and query for the “build history” page - showing the last 50 pipeline runs for a repository, with status and duration.
Expected depth: The candidate should note that this query joins pipelines (filtered by repo_id, ordered by created_at DESC LIMIT 50) with an aggregate over jobs to compute overall pipeline status and duration. The index needed is (repo_id, created_at DESC) on pipelines. They should also note that computing pipeline status from job statuses is expensive if done on read - it is better to maintain a denormalized pipeline.status field updated on each job state transition, making the build history query a simple indexed scan with no joins.
Premium Content
Unlock the full article along with everything else in the archive — all in one place.