Build a Real-Time Log Aggregation Pipeline
observability data-engineering distributed-systems
System Design Deep Dive
Real-Time Log Aggregation Pipeline
How do you ingest 1 million log events per second, make them searchable in under a second, and keep 90 days of history without a budget explosion?
Imagine every service in your infrastructure as a water main. At 3 AM on a Tuesday, one of those pipes silently starts leaking. By the time an alert fires, the damage is done - unless you already have every pressure reading from every sensor stored somewhere you can query in seconds. That is what a log aggregation pipeline is: the nervous system of an infrastructure, collecting every signal every service emits and making it queryable before the on-call engineer has finished reading the alert notification.
At 1 million events per second across a fleet of thousands of services, the challenge is not collecting the logs - it is doing everything else simultaneously. You must accept and acknowledge each event within milliseconds so the producer does not block. You must parse, tokenize, and index each event so a full-text search can return in under a second. You must replicate the data across failure domains so no hardware failure loses a log that a compliance audit will need six months from now. And you must tier the storage so 90-day retention costs tens of thousands of dollars per month rather than hundreds of thousands.
The engineering tension is threefold. Write throughput and query latency pull in opposite directions: the data structures that make bulk ingestion fast (append-only log segments) are exactly the wrong structures for random-access search (inverted indexes). Retention cost and query coverage also conflict: compressing old logs to cold storage is cheap, but querying across 90 days of compressed columnar data is orders of magnitude slower than querying a hot index. Finally, durability and latency fight each other at the ingest layer - acknowledging a write before it hits durable storage is fast but loses data on crash, while fsyncing every event before acknowledging is safe but limits throughput to a tiny fraction of what the hardware can do.
Four architectural decisions determine whether you solve all three tensions or merely trade one for another. The write-ahead buffer absorbs burst traffic and decouples producers from the slow indexing process. Log sharding distributes the index across machines so no single node becomes a bottleneck. Columnar storage on the cold path compresses repeated field patterns - like log level or service name - by 10x to 50x compared to raw JSON. And query fanout with result merging lets a single user query span thousands of shards without the user experiencing the latency of sequential shard scanning.
Requirements and Constraints
Functional Requirements
- Accept structured and semi-structured log events (JSON, syslog, CEF) from application services, infrastructure agents, and batch shippers
- Ingest at least 1 million events per second at sustained load, with burst headroom to 3 million events per second
- Index each event within 5 seconds of receipt so it is discoverable via full-text search
- Support full-text search queries across all indexed fields with sub-second response time for queries spanning the last 24 hours
- Support time-range queries and field-value filters (service=payments AND level=ERROR AND message:“timeout”) with result pagination
- Retain all logs for 90 days with tiered storage: hot for 7 days, warm for 30 days, cold for 90 days
- Expose a query API compatible with common log query languages (LogQL subset, Lucene syntax)
- Stream live tail of incoming logs filtered by service or pod label in real-time
Non-Functional Requirements
- Throughput: 1 million events per second sustained, 3 million burst; events average 512 bytes, peak at 8 KB
- Ingest latency: end-to-end acknowledgment within 50ms; indexing within 5 seconds of receipt
- Query latency: P99 under 800ms for 24-hour window full-text searches; P99 under 4 seconds for 7-day window queries
- Durability: zero data loss after acknowledgment; at-least-once delivery from producer to index
- Availability: 99.9% uptime for ingest; 99.5% for query API (brief degradation during compaction is acceptable)
- Retention cost: under $40,000 per month for 90-day retention at 1 million events/second with 512-byte average event size
Constraints
- Log producers use fire-and-forget agents (Fluent Bit, Vector, Logstash) - they will retry on failure but cannot buffer more than 60 seconds locally
- Some log fields (user IDs, payment amounts) are PII-sensitive and must be masked before indexing
- Compliance requires that no log older than 90 days is retained on any durable storage tier
- The query API must return results within 1 second for 80% of production queries from the on-call dashboard
- Budget cap is $50,000 per month total infrastructure including ingest, storage, and query serving
High-Level Architecture
The system decomposes into six tiers, each isolated so that a failure or slowdown in one tier cannot cascade into others.
The Ingest Gateway is a stateless HTTP and gRPC endpoint fleet that accepts log batches, validates schema, applies PII masking, and writes to the write-ahead buffer. It never touches durable storage directly.
The Write-Ahead Buffer is a partitioned Kafka cluster. Each log event is assigned to a partition by its shard key (the hash of service name and date). Kafka provides durable buffering, backpressure absorption, and the ability for index workers to replay from any offset if they fall behind or crash.
The Index Workers are stateful pods that consume from Kafka partitions, parse log fields, tokenize text, build in-memory MemTables, and flush those tables to immutable SSTables on hot storage. Each worker owns a fixed set of shard IDs - this is log sharding by partition assignment.
The Hot Storage tier stores the last 7 days of SSTables with full inverted indexes. It is optimized for random-access reads and serves the majority of interactive queries.
The Cold Storage tier holds day-level columnar Parquet files compressed with Zstandard. It is optimized for sequential scan over large time ranges. Warm storage (days 7-30) sits between: partially indexed columnar files on faster NVMe rather than object storage.
The Query API receives search requests, resolves the affected shard set, fans out sub-queries to the shards in parallel, merges and ranks results, and returns a paginated response to the client.
Key Insight
Every tier boundary in this architecture is also a durability checkpoint. The ingest gateway acknowledges only after Kafka accepts the write. Index workers commit Kafka offsets only after flushing to durable storage. This means you can crash any single tier and replay from the boundary without losing a single event.
The Ingest Layer and Write-Ahead Buffer
The ingest layer has one job: accept log batches as fast as possible, validate them minimally, and hand them off to Kafka with a durability guarantee. Every millisecond spent parsing or enriching at ingest time is a millisecond of added producer latency.
A log producer typically sends batches of 100 to 1,000 events in a single HTTP POST or gRPC stream. The ingest gateway deserializes the batch, strips or hashes PII fields using a static field mask configuration, assigns a monotonic ingest timestamp, and calls the Kafka producer. The Kafka producer is configured with acks=all and min.insync.replicas=2, meaning the broker only acknowledges the write once two replicas have persisted it to their write-ahead logs. This is the durability guarantee the ingest gateway propagates back to the producer.
The write-ahead buffer absorbs the difference between instantaneous burst traffic and the steady throughput of index workers. At 1 million events per second with an average event size of 512 bytes, the raw ingest rate is 512 MB/s. Index workers operating at their steady-state pace might process 400 MB/s. The buffer absorbs the 112 MB/s surplus for the duration of the spike and allows index workers to drain it afterward. This is why Kafka retention is set to 48 hours rather than the 5-second indexing window - it gives workers ample time to catch up after a crash and replay.
// Ingest gateway: batch validation, PII masking, and Kafka write
package ingest
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"time"
"github.com/IBM/sarama"
)
type LogEvent struct {
Timestamp time.Time `json:"ts"`
Service string `json:"service"`
Level string `json:"level"`
Message string `json:"message"`
Fields map[string]string `json:"fields"`
}
var piiFields = map[string]bool{
"user_id": true, "email": true, "card_number": true,
}
func maskPII(event *LogEvent) {
for k := range event.Fields {
if piiFields[k] {
raw := event.Fields[k]
h := sha256.Sum256([]byte(raw))
event.Fields[k] = fmt.Sprintf("sha256:%x", h[:8])
}
}
}
func shardKey(service, date string) int32 {
h := sha256.Sum256([]byte(service + "|" + date))
partition := int32(h[0])%128 + 0
return partition
}
func WriteBatch(ctx context.Context, producer sarama.SyncProducer, events []LogEvent) error {
msgs := make([]*sarama.ProducerMessage, 0, len(events))
for i := range events {
maskPII(&events[i])
if events[i].Timestamp.IsZero() {
events[i].Timestamp = time.Now().UTC()
}
payload, err := json.Marshal(events[i])
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
date := events[i].Timestamp.Format("2006-01-02")
partition := shardKey(events[i].Service, date)
msgs = append(msgs, &sarama.ProducerMessage{
Topic: "logs.raw",
Partition: partition,
Value: sarama.ByteEncoder(payload),
})
}
return producer.SendMessages(msgs)
}
Watch Out
Never set Kafka’s acks=1 on the log ingest topic. A single-replica acknowledgment means that if the leader broker crashes between accepting your write and replicating to followers, the event is silently lost. With 1 million events per second, even a 0.001% loss rate is 1,000 missing events every second - invisible until an audit or incident investigation turns up gaps.
The Indexing Engine
The indexing engine is where the raw event stream becomes searchable. Each index worker owns a fixed set of Kafka partitions - this is the implementation of log sharding. Shard ownership is stable across normal operation and rebalances only when workers are added or removed from the pool.
When a worker consumes a batch of events, it passes them through a two-stage pipeline. The parse stage deserializes JSON, extracts structured fields (timestamp, service, level, trace ID), and validates the event schema. The tokenize stage breaks the message field into terms: lowercasing, stripping punctuation, splitting on whitespace, and optionally stemming common English words. Both stages produce output into a shared MemTable - an in-memory sorted map from term to posting list.
An inverted index maps each unique term to the list of document IDs that contain that term, along with the byte offsets within the document where the term appears. When a query arrives for message:"connection refused", the query engine looks up the posting lists for “connection” and “refused”, intersects them to find documents that contain both terms, and scores the intersection by term frequency. This is the same data structure used by Lucene and Elasticsearch’s underlying engine.
The MemTable accumulates entries until it reaches 64 MB or 30 seconds, whichever comes first. At that point the worker atomically swaps to a new empty MemTable and flushes the old one to an immutable SSTable file on hot storage. The SSTable format stores the posting list in binary sorted order with delta-encoded document IDs (each ID stored as the difference from the previous, reducing the byte cost of long posting lists). A Bloom filter over the vocabulary allows O(1) rejection of terms that do not appear in a given SSTable without scanning the file.
# Index worker: MemTable accumulation and SSTable flush
import time
import struct
import json
import mmh3
from collections import defaultdict
from pathlib import Path
class PostingEntry:
__slots__ = ["doc_id", "offset"]
def __init__(self, doc_id: int, offset: int):
self.doc_id = doc_id
self.offset = offset
class MemTable:
def __init__(self, shard_id: int, max_bytes: int = 64 * 1024 * 1024):
self.shard_id = shard_id
self.max_bytes = max_bytes
self.index: dict[str, list[PostingEntry]] = defaultdict(list)
self.docs: list[bytes] = []
self.byte_count = 0
self.created_at = time.monotonic()
def add(self, doc: dict) -> bool:
raw = json.dumps(doc, separators=(",", ":")).encode()
doc_id = len(self.docs)
self.docs.append(raw)
terms = tokenize(doc.get("message", ""))
for term in set(terms):
offset = raw.find(term.encode())
self.index[term].append(PostingEntry(doc_id, max(offset, 0)))
self.byte_count += len(raw) + sum(len(t) + 8 for t in set(terms))
return self.byte_count >= self.max_bytes
def full(self) -> bool:
return (self.byte_count >= self.max_bytes or
time.monotonic() - self.created_at > 30)
def tokenize(text: str) -> list[str]:
text = text.lower()
tokens = []
buf = []
for ch in text:
if ch.isalnum() or ch == '_':
buf.append(ch)
elif buf:
tokens.append("".join(buf))
buf = []
if buf:
tokens.append("".join(buf))
return [t for t in tokens if len(t) >= 2]
def flush_to_sstable(table: MemTable, output_dir: Path) -> Path:
ts = int(time.time() * 1000)
path = output_dir / f"shard-{table.shard_id}-{ts}.sst"
bloom = BloomFilter(capacity=len(table.index), error_rate=0.01)
entries = []
for term in sorted(table.index.keys()):
bloom.add(term)
ids = sorted(e.doc_id for e in table.index[term])
deltas = [ids[0]] + [ids[i] - ids[i-1] for i in range(1, len(ids))]
entries.append((term, deltas))
with open(path, "wb") as f:
header = struct.pack(">I", len(entries))
f.write(header)
for term, deltas in entries:
term_b = term.encode()
f.write(struct.pack(">H", len(term_b)))
f.write(term_b)
f.write(struct.pack(">I", len(deltas)))
for d in deltas:
f.write(struct.pack(">I", d))
f.write(bloom.serialize())
for doc in table.docs:
f.write(struct.pack(">I", len(doc)))
f.write(doc)
return path
class BloomFilter:
def __init__(self, capacity: int, error_rate: float):
import math
m = int(-capacity * math.log(error_rate) / (math.log(2) ** 2))
self.bits = bytearray((m + 7) // 8)
self.size = m
self.k = max(1, int((m / capacity) * math.log(2)))
def add(self, item: str):
for seed in range(self.k):
h = mmh3.hash(item, seed) % self.size
self.bits[h // 8] |= 1 << (h % 8)
def maybe_contains(self, item: str) -> bool:
for seed in range(self.k):
h = mmh3.hash(item, seed) % self.size
if not (self.bits[h // 8] & (1 << (h % 8))):
return False
return True
def serialize(self) -> bytes:
return struct.pack(">I", len(self.bits)) + bytes(self.bits)
Key Insight
The Bloom filter at each SSTable boundary is the performance multiplier that makes querying thousands of shards practical. Before scanning any SSTable, the query engine checks the Bloom filter. If the term definitely is not present (the filter returns false), the entire SSTable is skipped with zero disk I/O. With a 1% false-positive rate, 99% of SSTable scans against irrelevant shards are eliminated before reading a single byte.
Storage: Hot Path and Cold Path
The storage tier is where the bulk of the engineering budget decisions live. At 1 million events per second with a 512-byte average size, the raw data rate is 512 MB/s - or roughly 43 TB per day. Storing 90 days at that rate in raw form would cost roughly $900,000 per month on cloud object storage. The cold path exists to make 90-day retention economically viable.
Hot storage (0-7 days) uses SSTables on NVMe-backed volumes. The files are small enough (64 MB each) that a 7-day shard fits in roughly 300 GB including the inverted index overhead. Hot storage nodes run with replication factor 2 for durability. Query latency is low because the Bloom filter eliminates most SSTable reads and the remainder hit local NVMe at microsecond access times.
Warm storage (7-30 days) uses columnar storage in Apache Parquet format, stored on slower SSD-backed volumes. Each day’s logs for a shard are compacted from hundreds of SSTables into a single Parquet file organized by column: timestamp, service, level, message, and each field key as its own column. Columnar layout means a query filtering only on level=ERROR reads only the level column, skipping the message body entirely. With Zstandard compression, the level column (values like “ERROR”, “WARN”, “INFO”) compresses by 40x to 80x because repeated values encode as references in the dictionary codec.
Cold storage (30-90 days) uses the same Parquet format but stored in object storage (S3 or GCS). Queries against cold storage fan out over a Spark or DuckDB query engine that reads column chunks in parallel. Cold queries are slower (2-15 seconds) but serve only historical and compliance use cases, not interactive on-call searches.
TTL-based retention is enforced by a background sweep job that runs nightly. It queries the metadata catalog for SSTables and Parquet files older than their tier limit, issues deletion requests, and removes the catalog entries. The sweep uses a soft-delete flag first, waits one hour for any in-flight queries against those files to complete, then issues the hard delete.
Log sampling is applied at the ingest gateway for high-cardinality DEBUG-level events. Health check endpoints, heartbeat logs, and other high-frequency low-signal events are sampled at 1-in-100 before being written to Kafka. The sampling decision is deterministic - the same request ID always maps to the same sampling outcome - so sampled events can be extrapolated back to true counts by multiplying by the sampling rate stored alongside the event.
# Nightly TTL sweep: soft-delete, drain window, hard-delete
import time
import boto3
import psycopg2
from datetime import datetime, timedelta, timezone
HOT_TTL_DAYS = 7
WARM_TTL_DAYS = 30
COLD_TTL_DAYS = 90
DRAIN_WINDOW_SECONDS = 3600
s3 = boto3.client("s3")
def run_ttl_sweep(db_conn, bucket: str):
now = datetime.now(timezone.utc)
cur = db_conn.cursor()
# Step 1: soft-delete files past their TTL
cur.execute("""
UPDATE storage_files
SET deleted_at = %s
WHERE deleted_at IS NULL
AND (
(tier = 'hot' AND created_at < %s) OR
(tier = 'warm' AND created_at < %s) OR
(tier = 'cold' AND created_at < %s)
)
RETURNING id, s3_key, tier
""", (
now,
now - timedelta(days=HOT_TTL_DAYS),
now - timedelta(days=WARM_TTL_DAYS),
now - timedelta(days=COLD_TTL_DAYS),
))
soft_deleted = cur.fetchall()
db_conn.commit()
print(f"Soft-deleted {len(soft_deleted)} files")
# Step 2: wait for drain window before hard-delete
time.sleep(DRAIN_WINDOW_SECONDS)
# Step 3: hard-delete files soft-deleted more than drain_window ago
cutoff = now - timedelta(seconds=DRAIN_WINDOW_SECONDS)
cur.execute("""
SELECT id, s3_key FROM storage_files
WHERE deleted_at IS NOT NULL AND deleted_at < %s
""", (cutoff,))
to_delete = cur.fetchall()
for file_id, s3_key in to_delete:
s3.delete_object(Bucket=bucket, Key=s3_key)
cur.execute("DELETE FROM storage_files WHERE id = %s", (file_id,))
db_conn.commit()
print(f"Hard-deleted {len(to_delete)} files from S3")
Real World
Grafana Loki uses this exact tiered storage pattern: chunks stored in object storage (S3/GCS), indexes stored in a fast KV store (Cassandra or BoltDB), with a configurable retention rule per label selector. ClickHouse uses columnar storage with TTL expressions on tables that automatically move or delete partitions as they age - a single ALTER TABLE logs MODIFY TTL toDateTime(ts) + INTERVAL 90 DAY DELETE rule handles the entire cold expiry.
The Query API and Fan-Out
The query API is the most latency-sensitive component because it sits in the interactive path of on-call engineers. A query arrives as a structured search request: a time range, an optional set of field filters, and a full-text expression. The API resolves which shards hold data for that time range, fans out sub-queries to those shards in parallel, collects and merges results, and returns the top N events sorted by timestamp.
Query fanout means sending the same query to all relevant shards simultaneously rather than sequentially. For a 24-hour query against a 128-shard setup, all 128 shards execute the sub-query in parallel. Each shard returns its top K results (typically K=100). The fanout coordinator receives up to 128 lists of 100 results and performs a K-way merge, taking the globally top N results by timestamp or relevance score.
The fanout coordinator tracks in-flight sub-queries with a context timeout. If a shard takes more than 800ms to respond, the coordinator marks it as slow, includes partial results from that shard if available, and returns the response to the user with a warning that results may be incomplete. This is a deliberate tradeoff: a slow shard should not block the entire query response. In practice, P99 shard latency is under 200ms when all shards are healthy, so the 800ms timeout provides a comfortable 4x margin.
// Query fanout: parallel shard dispatch with timeout and partial results
package query
import (
"context"
"sort"
"sync"
"time"
)
type LogEntry struct {
Timestamp time.Time
Service string
Level string
Message string
ShardID int
}
type ShardResult struct {
ShardID int
Entries []LogEntry
Err error
Partial bool
}
type ShardClient interface {
Search(ctx context.Context, shardID int, query string, from, to time.Time, limit int) ([]LogEntry, error)
}
func FanOutQuery(
ctx context.Context,
client ShardClient,
shards []int,
query string,
from, to time.Time,
totalLimit int,
) ([]LogEntry, []int, error) {
perShardLimit := totalLimit
if perShardLimit < 100 {
perShardLimit = 100
}
results := make(chan ShardResult, len(shards))
var wg sync.WaitGroup
for _, shardID := range shards {
wg.Add(1)
go func(sid int) {
defer wg.Done()
shardCtx, cancel := context.WithTimeout(ctx, 800*time.Millisecond)
defer cancel()
entries, err := client.Search(shardCtx, sid, query, from, to, perShardLimit)
partial := false
if err != nil && shardCtx.Err() == context.DeadlineExceeded {
partial = true
err = nil
}
results <- ShardResult{ShardID: sid, Entries: entries, Err: err, Partial: partial}
}(shardID)
}
go func() {
wg.Wait()
close(results)
}()
var all []LogEntry
var slowShards []int
for r := range results {
if r.Partial {
slowShards = append(slowShards, r.ShardID)
}
all = append(all, r.Entries...)
}
sort.Slice(all, func(i, j int) bool {
return all[i].Timestamp.After(all[j].Timestamp)
})
if len(all) > totalLimit {
all = all[:totalLimit]
}
return all, slowShards, nil
}
Key Insight
The fan-out coordinator must impose a per-shard result limit, not a global one. If you ask each shard for “all matching results” and 128 shards each return 10,000 entries, the coordinator receives 1.28 million entries into memory before it can begin merging. With a per-shard limit of 100 and 128 shards, the merge set is at most 12,800 entries - a 100x reduction in coordinator memory pressure.
Data Model
The metadata catalog tracks every file across all tiers. The event schema captures the fields required for partitioned querying. The shard registry maps shard IDs to index workers.
-- Log event schema for hot storage metadata catalog
-- Demonstrates partitioned table with shard assignment and tier tracking
CREATE TABLE log_shards (
shard_id SMALLINT PRIMARY KEY,
owner_worker_id TEXT NOT NULL,
partition_key TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE storage_files (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
shard_id SMALLINT NOT NULL REFERENCES log_shards(shard_id),
tier TEXT NOT NULL CHECK (tier IN ('hot','warm','cold')),
s3_key TEXT,
local_path TEXT,
size_bytes BIGINT NOT NULL,
event_count BIGINT NOT NULL,
min_ts TIMESTAMPTZ NOT NULL,
max_ts TIMESTAMPTZ NOT NULL,
bloom_filter BYTEA,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ
);
CREATE INDEX idx_storage_files_shard_ts
ON storage_files (shard_id, min_ts, max_ts)
WHERE deleted_at IS NULL;
CREATE INDEX idx_storage_files_tier_created
ON storage_files (tier, created_at)
WHERE deleted_at IS NULL;
-- Columnar Parquet metadata for warm and cold files
CREATE TABLE parquet_column_stats (
file_id UUID NOT NULL REFERENCES storage_files(id) ON DELETE CASCADE,
column_name TEXT NOT NULL,
null_count BIGINT NOT NULL DEFAULT 0,
distinct_count BIGINT,
min_value TEXT,
max_value TEXT,
compressed_size BIGINT NOT NULL,
PRIMARY KEY (file_id, column_name)
);
-- Query audit log: tracks fan-out latency and shard coverage
CREATE TABLE query_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
query_text TEXT NOT NULL,
time_range_from TIMESTAMPTZ NOT NULL,
time_range_to TIMESTAMPTZ NOT NULL,
shards_queried SMALLINT NOT NULL,
shards_slow SMALLINT NOT NULL DEFAULT 0,
result_count INT NOT NULL,
latency_ms INT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (created_at);
CREATE TABLE query_log_2026_06 PARTITION OF query_log
FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');
The shard key is derived from sha256(service_name + "|" + date)[:1], which distributes shards uniformly across 256 buckets. Each bucket maps to one of the 128 logical shard IDs by integer division. This two-level mapping means adding new shard IDs only requires rebalancing half the buckets rather than rehashing every event.
Key Algorithms and Protocols
Log Sharding Hash
The shard assignment algorithm must be consistent: the same service-date pair must always route to the same shard so that the query engine knows which shard to query for a given service over a given time range.
# Consistent shard assignment using SHA-256 with two-level mapping
import hashlib
NUM_BUCKETS = 256
NUM_SHARDS = 128
def assign_shard(service: str, date: str) -> int:
"""
Returns shard_id in [0, NUM_SHARDS) for the given service and date.
Deterministic: same inputs always produce the same shard_id.
Time complexity: O(1).
"""
key = f"{service}|{date}".encode()
digest = hashlib.sha256(key).digest()
bucket = digest[0]
shard_id = bucket * NUM_SHARDS // NUM_BUCKETS
return shard_id
def shards_for_query(service: str, dates: list[str]) -> set[int]:
"""
Returns the set of shard IDs that must be queried for the given
service over the given list of dates (one per day in range).
"""
return {assign_shard(service, d) for d in dates}
Inverted Index Construction Complexity
Building the inverted index from a batch of N documents with average M terms each has time complexity O(N * M * log(N * M)) for the sort step that produces the final sorted posting list. At 64 MB MemTable with 512-byte events, N is approximately 131,072 documents. With an average of 15 terms per message, the total sort is over 1.96 million term-document pairs. In practice, the sort completes in under 500ms on a single core using a radix sort on the term strings.
TTL Sweep Algorithm
The TTL sweep runs nightly with a soft-delete window to avoid deleting files that are still being scanned by in-flight queries.
# TTL tier-aware sweep with reference counting to avoid deleting active files
import threading
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Optional
@dataclass
class FileRef:
file_id: str
s3_key: str
created_at: datetime
tier: str
deleted_at: Optional[datetime] = None
ref_count: int = field(default=0, repr=False)
TIER_TTL = {"hot": 7, "warm": 30, "cold": 90}
class FileRegistry:
def __init__(self):
self._files: dict[str, FileRef] = {}
self._lock = threading.Lock()
def acquire(self, file_id: str) -> bool:
with self._lock:
f = self._files.get(file_id)
if f is None or f.deleted_at is not None:
return False
f.ref_count += 1
return True
def release(self, file_id: str):
with self._lock:
f = self._files.get(file_id)
if f:
f.ref_count = max(0, f.ref_count - 1)
def sweep(self, now: datetime) -> list[str]:
"""
Mark files past TTL for soft-delete if ref_count == 0.
Returns list of file_ids marked for deletion.
Time complexity: O(F) where F = total number of tracked files.
"""
marked = []
with self._lock:
for file_id, f in self._files.items():
if f.deleted_at is not None:
continue
ttl_days = TIER_TTL[f.tier]
expiry = f.created_at + timedelta(days=ttl_days)
if now >= expiry and f.ref_count == 0:
f.deleted_at = now
marked.append(file_id)
return marked
Scaling and Performance
Horizontal scaling in this system follows a clear hierarchy. The ingest gateway is trivially horizontal: add more pods behind the load balancer. Each gateway pod is stateless, so autoscaling based on CPU or request rate works without coordination.
The Kafka cluster scales by adding partitions. At 1 million events per second with 128 partitions, each partition handles roughly 7,800 events per second or 4 MB/s. Kafka brokers can sustain 100-500 MB/s each depending on disk configuration, so 128 partitions fit comfortably on 3-5 brokers with room to grow.
Index workers scale by adding pods and rebalancing Kafka partition assignments. A Kafka consumer group with 64 workers means each worker owns 2 partitions, processing about 8 MB/s of raw events. Adding workers to the consumer group triggers a rebalance where partitions are redistributed. During rebalance (typically 30-60 seconds), indexing pauses on the affected partitions. Kafka’s offset tracking ensures no events are skipped or double-processed.
Capacity estimation:
Event rate: 1,000,000 events/second
Average event: 512 bytes
Raw ingest: 512 MB/s = 43 TB/day
Kafka retention: 48 hours = 86 TB (at 2x replication = 172 TB raw disk)
Hot storage: 7 days = 301 TB (with inverted index overhead ~2.5x = ~750 TB)
Warm storage: 30 days Parquet @ 20x compression = 64 TB
Cold storage: 90 days Parquet @ 40x compression = 97 TB
Hot NVMe cost: 750 TB * $0.08/GB/month = $60,000/month [requires tiering]
Warm SSD cost: 64 TB * $0.05/GB/month = $3,200/month
Cold S3 cost: 97 TB * $0.023/GB/month = $2,230/month
Ingest gateway: 20 pods * $0.10/hr = $1,440/month
Kafka brokers: 5 * r6i.4xlarge = $4,800/month
Index workers: 32 * c6i.2xlarge = $7,700/month
Query API: 10 * c6i.xlarge = $1,900/month
Total estimate: ~$42,000/month (within $50k budget with tuning)
Hot storage is the budget-critical tier. The key lever is the compaction ratio: aggressively merging SSTables reduces the index overhead from 2.5x to 1.8x, saving roughly $16,000 per month. The second lever is sampling: applying 10-in-100 sampling to DEBUG-level events reduces raw ingest by 30-40%, since DEBUG typically dominates volume.
Real World
Elasticsearch at scale typically runs hot data on NVMe with a 2-3x index overhead ratio - similar to this estimate. Datadog achieved cost reductions of over 60% by introducing a Preaggregated Metrics tier for high-cardinality time-series data and applying adaptive sampling to trace spans. Grafana Loki’s label-indexed-only approach (no full-text index on hot path) trades query power for dramatically lower index overhead - often 1.05x rather than 2.5x, at the cost of requiring label filtering before full-text search.
Failure Modes and Recovery
| Failure | Detection | Impact | Recovery |
|---|---|---|---|
| Ingest gateway pod crash | Load balancer health check, 5s timeout | Producers retry next pod; up to 5s of elevated producer latency | Kubernetes restarts pod; no data loss because producers retry |
| Kafka broker failure | Broker heartbeat timeout (10s) | Partition leaders on failed broker become unavailable until election | Controller elects new leader from ISR within 15-30s; no data loss if ISR has at least 1 replica |
| Index worker crash mid-flush | Worker heartbeat timeout (30s) | Kafka partition unowned for 30s; events queue up in Kafka buffer | Consumer group rebalance reassigns partition; worker replays from last committed offset |
| Hot storage node failure | Storage health check, 10s | SSTables on failed node unreadable; queries to affected shards return partial results | Replica promoted to primary; if no replica, shard marked unavailable with query warning |
| Query API timeout on shard | Per-shard 800ms deadline | Partial results returned with slow-shard warning in response metadata | Client retries with longer timeout or narrows time range; shard auto-recovers or is replaced |
| Cold storage object corruption | Parquet read checksum mismatch | Affected day-range returns error for that shard | Restore from S3 versioning or re-run compaction from SSTable archives if within warm retention |
Watch Out
The most dangerous failure mode is silent data loss at the ingest gateway. If the gateway acknowledges a write to the producer but the Kafka write fails (due to a full producer buffer or a transient broker error), the event is gone with no signal to the producer. Always configure Kafka producers with retries=Integer.MAX_VALUE, delivery.timeout.ms=120000, and a bounded in-flight request count to prevent reordering. Never acknowledge to the upstream producer before you have a Kafka acknowledgment in hand.
Comparison of Approaches
| Approach | Query Latency (24h) | Index Overhead | Failure Mode | Best Fit |
|---|---|---|---|---|
| Full inverted index (Elasticsearch style) | P99 under 500ms | 2-3x raw size | Index node failure loses shard availability | Interactive search, on-call dashboards |
| Label-indexed only (Grafana Loki style) | P99 500ms-2s (requires label filter) | 1.05-1.2x raw size | Similar to full index but cheaper at scale | Kubernetes log exploration with good labeling |
| Columnar scan (ClickHouse style) | P99 200ms for selective queries; 5-20s for full scans | 1.1-1.5x compressed | Column corruption affects wide range of queries | Analytics, aggregation queries over large time ranges |
| Streaming only (no persistent index) | Not applicable - no historical search | None | Buffer gap = permanent loss | Live tail only, very short retention (under 1 hour) |
| Hybrid (this design) | P99 800ms hot; 4s warm; 15s cold | 2.5x hot, 1.2x warm, 1.05x cold | Tier-isolated failures; hot loss degrades to warm | Production observability with tiered cost and SLA |
Key Takeaways
- Write-ahead buffer decouples producers from indexers. A 48-hour Kafka buffer means index workers can crash and replay without losing a single event. The buffer absorbs burst traffic that would otherwise overwhelm the indexing layer.
- Log sharding distributes index load linearly. Assigning each Kafka partition to a dedicated index worker means adding workers scales throughput linearly. There is no coordination between workers during normal indexing.
- Inverted indexes enable sub-second full-text search. The inverted index is the data structure that makes “find all logs containing ‘connection refused’ in the last 24 hours” a sub-second operation instead of a full-table scan.
- Bloom filters eliminate unnecessary SSTable reads. At 1% false-positive rate, 99% of lookups against SSTables that do not contain a query term return instantly. Without Bloom filters, every SSTable on every shard would require a full binary search on every query.
- Columnar storage compresses repeated fields by 30-80x. The warm and cold storage tiers are economically viable only because Parquet with dictionary encoding and Zstandard compression makes repeated low-cardinality fields (like log level or service name) nearly free to store.
- TTL-based retention must be soft-delete with a drain window. Hard-deleting files that in-flight queries are reading produces non-deterministic errors. A soft-delete flag with a one-hour drain window allows graceful cleanup without corrupting active query results.
- Query fanout must be bounded at the per-shard level. Asking each of 128 shards for all results before merging is a recipe for coordinator memory exhaustion. A per-shard limit of 100-500 results keeps the merge set manageable regardless of how popular a query term is.
- Log sampling reduces volume without losing signal. Sampling DEBUG-level events at 1-in-100 with a deterministic hash preserves the ability to extrapolate true counts. Combined with recorded sampling rates, sampled data remains statistically useful for capacity and performance analysis.
A log aggregation pipeline at scale is ultimately an exercise in deciding which copies of data to keep at what fidelity. The hot index is an expensive, high-fidelity copy optimized for interactive search. The cold Parquet files are cheap, compressed copies optimized for compliance and analytics. The Kafka buffer is a transient, ordered copy optimized for replay. Each copy serves a different consumer with different latency and cost requirements. Designing the boundaries between them - and the protocols that move data from one tier to the next - is the core of the engineering challenge.
Frequently Asked Questions
Why use Kafka as a write-ahead buffer instead of writing directly to the index?
Writing directly to the index forces the ingest gateway to wait for index write completion before acknowledging the producer. Index writes involve SSTable flushing, Bloom filter construction, and metadata updates - operations that take tens to hundreds of milliseconds. With 1 million events per second, blocking ingest on index latency would reduce acknowledgment throughput by 10-50x. Kafka provides a middle ground: durable acknowledgment in under 10ms at broker write speed, with indexing happening asynchronously in the background.
Why not use Elasticsearch directly instead of building a custom pipeline?
Elasticsearch is an excellent choice for log workloads up to roughly 100,000 events per second per cluster. Above that, its Java-based JVM heap and segment merge overhead become tuning-intensive, and its pricing at scale is significant. More importantly, Elasticsearch couples ingest and query in a single node, which means ingest throughput and query latency affect each other. The custom pipeline described here decouples them via Kafka, allowing the two to scale independently. That said, OpenSearch (the community fork) and Elasticsearch with hot-warm architecture handle many production workloads effectively - you only need a custom pipeline at very high scale or very tight cost constraints.
How does query fanout handle a query against all services over 90 days?
A 90-day all-service query must fan out to all 128 shards, and for each shard, must query across hot, warm, and cold tiers in parallel. This is the most expensive possible query. The query API applies a maximum fanout limit (say, 256 shard-tier combinations) and rejects queries that would exceed it, asking the user to narrow the time range or add a service filter. For legitimate broad historical queries (compliance audits), the query is submitted as an asynchronous job that runs over Spark against cold storage and delivers results to an S3 output bucket.
Why not use a single columnar store (like ClickHouse) for all tiers?
ClickHouse and similar columnar systems excel at aggregation queries over large datasets but have higher per-row scan overhead for selective full-text searches compared to an inverted index. A search for a specific error message across 7 days in ClickHouse requires scanning all message column chunks for those 7 days. An inverted index on hot storage answers that query by reading only the posting list for the query term, which is orders of magnitude smaller. The tiered approach uses the right data structure for each access pattern: inverted index for recent interactive search, columnar for historical analytics.
Why store the sampling rate alongside each event rather than normalizing at ingest?
Normalizing sampled events at ingest (for example, writing each sampled event with a weight of 100 to compensate for 1-in-100 sampling) breaks exact-match queries. If you are searching for a specific request ID, a weight-normalized event is either present or absent - weighting does not help. Storing the original event plus the sampling rate as a field allows the query layer to present exact events for forensic search while allowing the analytics layer to apply the rate for aggregate statistics. You get the best of both behaviors from a single storage representation.
What happens to log indexing during a Kafka partition rebalance?
During a consumer group rebalance, the affected partitions are briefly unowned. New events continue arriving in Kafka and are durably stored, but no index worker is consuming them. The rebalance typically takes 30-60 seconds. After rebalance, the new owner picks up from the last committed offset for that partition and begins processing the queued events. The result is a temporary indexing lag of 30-90 seconds for logs on the affected partitions. Hot log search may show a gap during this window; the gap closes as the worker catches up. The ingest gateway is completely unaffected because it writes to Kafka independently of partition ownership.
Interview Questions
Design the ingest layer to handle a 10x traffic spike without dropping events.
Describe how the Kafka write-ahead buffer absorbs burst traffic by decoupling producer acknowledgment from indexing throughput. Explain that ingest gateway pods are stateless and autoscale horizontally behind a load balancer, so a 10x spike adds pods within 60-90 seconds via Kubernetes HPA on CPU or requests-per-second metrics. The key is that Kafka acts as the shock absorber: index workers fall behind during the spike but catch up after it subsides, as long as Kafka retention is long enough (48 hours covers even a sustained multi-hour spike).
Expected depth: Candidate should mention Kafka partition count as the ceiling on ingest parallelism, the acks=all producer config for durability, and the concept of consumer lag as the observable signal of index workers falling behind.
How would you implement full-text search with sub-second latency at 1 million events per second?
Walk through the inverted index data structure: terms mapped to delta-encoded posting lists. Explain that the Bloom filter at each SSTable allows O(1) rejection of missing terms, eliminating most disk reads. Describe the MemTable flush cycle (64 MB or 30 seconds) that keeps the index up to date within a 5-second window. Explain that query fanout across 128 shards in parallel means each shard handles a small fraction of the total data, making sub-second response achievable.
Expected depth: Candidate should describe the inverted index structure, explain why Bloom filters are necessary at scale, and note that per-shard result limits prevent coordinator memory exhaustion.
How do you enforce 90-day retention without losing compliance-critical logs or breaking budget?
Describe the three-tier storage model: hot SSTables on NVMe for 7 days, warm Parquet on SSD for 30 days, cold Parquet on object storage for 90 days. Explain that nightly TTL sweeps use soft-delete with a drain window to avoid deleting files in active query use. Describe the compaction job that converts SSTables to Parquet as data ages out of hot tier, achieving 20-40x compression and making cold storage economically viable. Mention that compliance requirements drive the hard delete of cold files at exactly day 90.
Expected depth: Candidate should quantify the cost difference between tiers, explain why soft-delete is needed, and describe how the columnar format enables cost-effective long retention.
A query for ‘error’ across all services over 7 days takes 30 seconds. How do you fix it?
Diagnose the possible causes: too many shards being queried sequentially (not in parallel), posting list for the term “error” is enormous (it appears in nearly every log), no Bloom filter to skip irrelevant SSTables, or query hitting warm/cold storage instead of hot. Solutions: enforce per-shard result limits and true parallel fanout to eliminate sequential scan. For “error” specifically - a high-frequency stop-word - implement a term frequency threshold and require at least one more selective filter term in the query before accepting it. Add query-time logging to identify which shards are slow.
Expected depth: Candidate should identify that “error” is essentially a stop-word in logs and discuss the tradeoff between indexing all terms versus filtering common terms from the index.
How would you add multi-tenant isolation so tenant A’s queries cannot read tenant B’s logs?
Add tenant ID as a mandatory field at ingest time, validated by the gateway against the client’s auth token. Shard by sha256(tenant_id + "|" + service + "|" + date) so tenant data naturally segregates into tenant-specific shards. The query API resolves the requesting tenant from the auth context and restricts fanout to shards owned by that tenant. For large tenants, assign dedicated shard ranges. For small tenants, co-locate multiple tenants in shared shards but enforce row-level filtering by tenant ID before returning results. Encrypt cold Parquet files with per-tenant keys so a storage breach does not expose cross-tenant data.
Expected depth: Candidate should address both the routing layer (shard assignment) and the query layer (filtering), and mention the encryption requirement for cold storage multi-tenancy.
Premium Content
Unlock the full article along with everything else in the archive — all in one place.