Build Twitter's Trending Topics Pipeline


scalability performance data-engineering

System Design Deep Dive

Twitter’s Trending Topics Pipeline

Detecting viral signals at 500k tweets/sec when counting everything is too slow and approximating everything loses precision

⏱ 14 min read📐 Advanced🏗️ Stream Processing

Think of the trending topics sidebar as a live newspaper editor standing in a room with 500,000 people all talking at once. The editor’s job is not to transcribe everything - it’s to notice which words suddenly appear in 10,000 conversations simultaneously when five minutes ago nobody was saying them. That pattern-detection problem, at machine speed and global scale, is what Twitter’s trending pipeline solves.

The naive approach has two faces, and both fail. The first face: store every tweet’s tokens, run GROUP BY token ORDER BY COUNT DESC every 60 seconds. At 500,000 tweets per second with an average of 8 tokens per tweet, that’s 4 million token events per second flowing into a database. After one hour you have 14.4 billion rows to scan. The query takes minutes, not seconds. The second face: discard old data and only track the last 5 minutes exactly. This eliminates the scan cost but requires updating a counter for every token in every tweet - that’s still 4 million counter increments per second against a shared key-value store. Redis can handle roughly 1 million operations per second on a single node. The math doesn’t work.

What makes this problem genuinely hard is the combination of three forces pulling in opposite directions. You need velocity (a topic exploding in the last 5 minutes) rather than volume (a topic that has been consistently popular for months - “good morning” trends every day, and nobody wants to see that). You need recency without full-history replay. And you need geographic accuracy - what’s trending in Brazil is different from what’s trending in Japan, and showing one region’s trends to another degrades the product severely.

We need to solve for approximate counting with bounded error, sliding window aggregation without replaying history, geographic partitioning without losing cross-region signals, and bot suppression before counts pollute the trend signal.

Requirements and Constraints

Functional Requirements

  • Detect the top-30 trending topics per geographic region and worldwide
  • Refresh trends every 60 seconds
  • Support at least 20 distinct geographic regions (US-East, US-West, EU-West, APAC, LATAM, etc.)
  • A “trend” is a topic whose mention velocity (rate of change) exceeds a threshold, not just raw volume
  • Filter out known bot accounts and coordinated inauthentic behavior before computing trends
  • Suppress recurring daily topics that trend every day at the same time (not genuinely emergent)

Non-Functional Requirements

  • Ingest throughput: 500,000 tweets per second peak (300,000 average)
  • Trend refresh latency: less than 60 seconds from tweet to visible trend
  • Serving latency: trends API returns in under 50ms (p99)
  • Availability: 99.9% uptime (8.7 hours of downtime per year)
  • Eventual consistency acceptable: a topic that goes viral can appear in trends within 2 refresh cycles (120 seconds max)
  • Memory budget per processing node: under 512MB for all in-memory sketch state
  • Zero reprocessing of historical data per 60-second cycle

Constraints and Assumptions

  • We do not need to retroactively recalculate trends (no historical replay)
  • Trend lists are read-heavy (millions of readers) versus write-heavy (one writer per region per 60s)
  • Geographic assignment uses IP geolocation and account home region - we assume this is pre-computed upstream
  • Tweet language detection is handled upstream; trends are per-region but language-specific filtering is a separate concern
  • We are not building the tweet storage or fanout system - only the trending computation layer

High-Level Architecture

The pipeline has four layers stacked vertically: ingest, stream processing, aggregation, and serving. Data flows downward; control signals flow upward.

Twitter trending topics pipeline high-level architecture showing ingest, stream processing, aggregation, and serving layers

Tweet Stream feeds raw tweet events into a Kafka cluster partitioned by region_id. Kafka acts as the durable buffer between unpredictable ingest spikes and the downstream processing system. Each partition is owned by a dedicated Flink worker that maintains a sliding window and a Count-Min Sketch in memory. The Flink workers emit their local top-K lists every 60 seconds. A Regional Aggregator merges the outputs from all workers in a region into a single ranked list. That list is written to a Redis sorted set keyed by region. The Trends API reads from Redis and serves it through a CDN edge cache with a 30-second TTL.

Bot Filter sits inline on the Kafka consumer side. Each Flink worker applies velocity-based bot detection before a tweet’s tokens are counted. This keeps bot-inflated counts from ever entering the sketch.

Velocity Scorer applies a decay function and computes velocity (rate of change relative to baseline) as each window emits. This is what distinguishes a genuine emerging trend from a chronically popular topic.

Key Insight

The Count-Min Sketch does not store individual tweet records - it stores only counters in a fixed-size matrix. No matter how many tweets arrive, the sketch memory is constant at 4 * 50,000 * 4 bytes = 800KB per region per worker. The pipeline’s memory footprint is independent of tweet volume.

The Ingest and Partitioning Layer

The ingest layer’s job is to route each tweet to the right Flink worker without creating hot partitions. This sounds like a solved problem until you consider regional imbalances: the APAC partition sees a massive spike every morning when India’s commute hour begins, while the LATAM partition sees relatively low volume during the same UTC window.

Kafka partitions are assigned by hash(region_id) mod num_partitions. Within a region, tweets are distributed across multiple partitions using hash(user_id). This two-level partitioning serves two goals: region isolation (so APAC traffic cannot crowd out EU traffic) and within-region parallelism (so one region uses multiple Flink workers, each seeing a consistent slice of that region’s users).

# Kafka topic configuration for tweet ingest
# topic: tweet-tokens-v2
bootstrap.servers: kafka-cluster:9092
num.partitions: 120
replication.factor: 3
retention.ms: 3600000  # 1 hour retention - enough for reprocessing after a worker crash
min.insync.replicas: 2
compression.type: lz4
# Partition assignment: first 20 partitions per region (6 regions = 120 total)
# Producer key: "{region_id}:{hash(user_id) mod 20}"

The Kafka producer does lightweight token extraction before publishing: hashtags, cashtags, and significant noun phrases extracted with a simple regex. Full NLP runs asynchronously and is not in the critical path for trending. This means each Kafka message carries the original tweet ID, a region tag, a timestamp, and a list of 3-12 pre-extracted tokens.

Real World

Twitter’s actual Heron (successor to Storm) and later Kafka Streams setup used a similar two-level partitioning strategy. The key innovation in Twitter’s 2015 “Trending Topics” engineering blog post was separating the token extraction (cheap, synchronous) from the NLP pipeline (expensive, async) to keep the counting path under 5ms per tweet.

The Count-Min Sketch and Sliding Window

This component is the mathematical heart of the pipeline. Its job is to answer “how many times has token T appeared in the last 5 minutes?” in O(1) time, using O(1) space regardless of how many distinct tokens exist.

A Count-Min Sketch works like a fingerprinting system. Think of it as a grid of counters with d rows and w columns. Each row has its own hash function. To increment token T, you compute h_i(T) mod w for each row i and increment that cell. To query token T, you compute the same positions and return the minimum value across all rows. The minimum is an upper bound on the true count - it may overcount due to hash collisions, but it never undercounts.

The beauty is that no matter how many distinct tokens arrive - millions of unique phrases - the sketch stays at d * w * 4 bytes. With d=4 rows and w=50,000 columns, that’s 800KB. The error guarantee is: with probability 1 - 1/e^d, the estimated count is within e * total_count / w of the true count. With d=4, w=50,000 and a stream volume of 4 million token events per second, the absolute error per token is bounded at roughly 4M * 2.718 / 50000 = 217 counts. For a trending topic appearing 50,000 times in 5 minutes, that’s less than 0.5% error.

# Count-Min Sketch implementation for trending token counting
import hashlib
import struct

class CountMinSketch:
    def __init__(self, depth: int = 4, width: int = 50_000):
        # depth=4, width=50000: ~800KB, error bound < 0.5% for high-frequency tokens
        self.depth = depth
        self.width = width
        self.table = [[0] * width for _ in range(depth)]
        self.seeds = [i * 2654435761 & 0xFFFFFFFF for i in range(1, depth + 1)]

    def _hash(self, token: str, seed: int) -> int:
        # FNV-style hash with seed mixing for independence between rows
        h = seed
        for ch in token.encode("utf-8"):
            h = ((h ^ ch) * 16777619) & 0xFFFFFFFF
        return h % self.width

    def increment(self, token: str, count: int = 1) -> None:
        for row, seed in enumerate(self.seeds):
            col = self._hash(token, seed)
            self.table[row][col] += count

    def query(self, token: str) -> int:
        return min(
            self.table[row][self._hash(token, seed)]
            for row, seed in enumerate(self.seeds)
        )

    def merge(self, other: "CountMinSketch") -> None:
        # Merge another sketch into this one (used for worker aggregation)
        assert self.depth == other.depth and self.width == other.width
        for row in range(self.depth):
            for col in range(self.width):
                self.table[row][col] += other.table[row][col]

The sliding window layered on top of the sketch uses a ring buffer of micro-buckets. Instead of maintaining one global sketch that covers all history, we maintain N buckets of duration T/N each. Every T/N seconds, the oldest bucket is evicted and a new empty bucket is opened. The sliding window query sums the counts across all current buckets.

# Sliding window aggregator using bucketed Count-Min Sketches
import time
from collections import deque
from typing import Optional

class SlidingWindowSketch:
    def __init__(
        self,
        window_seconds: int = 300,   # 5-minute sliding window
        bucket_count: int = 30,       # 30 buckets of 10 seconds each
        sketch_depth: int = 4,
        sketch_width: int = 50_000,
    ):
        self.window_seconds = window_seconds
        self.bucket_duration = window_seconds / bucket_count
        self.bucket_count = bucket_count
        self.sketch_depth = sketch_depth
        self.sketch_width = sketch_width
        # Ring buffer of (bucket_start_time, sketch) pairs
        self.buckets: deque = deque()
        self._open_new_bucket()

    def _open_new_bucket(self) -> None:
        self.buckets.append({
            "start": time.time(),
            "sketch": CountMinSketch(self.sketch_depth, self.sketch_width),
        })

    def _evict_expired(self) -> None:
        cutoff = time.time() - self.window_seconds
        while self.buckets and self.buckets[0]["start"] < cutoff:
            self.buckets.popleft()

    def record(self, token: str) -> None:
        now = time.time()
        current = self.buckets[-1]
        if now - current["start"] >= self.bucket_duration:
            self._open_new_bucket()
        self._evict_expired()
        self.buckets[-1]["sketch"].increment(token)

    def estimate(self, token: str) -> int:
        self._evict_expired()
        # Sum across all active buckets - O(bucket_count) = O(30) = fast
        return sum(b["sketch"].query(token) for b in self.buckets)
Key Insight

The sliding window never subtracts - it only drops entire buckets. This means a token cannot be double-decremented by clock skew or out-of-order arrival. The eviction of a 10-second bucket is atomic: either the entire bucket is counted or it is not. This property makes the window semantics correct even when tweets arrive slightly late due to Kafka consumer lag.

Bot Filtering

Bot filtering’s job is to prevent coordinated inauthentic behavior from manufacturing fake trends by flooding the pipeline with synthetic tweets mentioning a target topic.

Without bot filtering, a coordinated campaign can rent 10,000 bots, each tweeting the same hashtag 50 times per second. That generates 500,000 synthetic mentions per second - matching the entire organic tweet volume. The Count-Min Sketch would faithfully count every one of these, and the target hashtag would appear to be the most trending topic globally.

The bot filter runs inline in each Flink worker before any token is counted. It applies four checks in order from cheapest to most expensive:

Velocity check: if a user_id emits more than 10 tweets per minute (sliding window per user), all of that user’s tweets are dropped. Legitimate users almost never tweet faster than this. This check uses a small per-user sliding counter stored in a Bloom filter-backed HyperLogLog to avoid storing state for every user.

Account age check: accounts created fewer than 7 days ago are weighted at 0.1 instead of 1.0. New accounts can still contribute to trends, but their contribution is dampened until they establish a track record.

Content hash deduplication: within each 60-second window, if the same tweet text (after normalization) appears from more than 5 distinct accounts, subsequent identical tweets are dropped. Coordinated campaigns tend to use identical or near-identical text.

Subnet rate limiting: if a single /24 IP subnet (256 addresses) emits more than 200 tweets per minute, all traffic from that subnet is throttled by 80%.

# Bot filter applied per-event before token counting
import time
from collections import defaultdict

class BotFilter:
    def __init__(self):
        # Per-user tweet velocity counter (sliding 60s window)
        self._user_counts: dict = defaultdict(list)
        # Content hash counts in current 60s window
        self._content_hashes: dict = defaultdict(int)
        # Subnet counts
        self._subnet_counts: dict = defaultdict(list)
        self._window_sec = 60
        self._user_rate_limit = 10       # tweets per minute per user
        self._subnet_rate_limit = 200    # tweets per minute per /24 subnet
        self._dupe_text_limit = 5        # same content from N different users

    def _prune_window(self, timestamps: list, now: float) -> list:
        return [t for t in timestamps if now - t < self._window_sec]

    def should_count(self, user_id: str, content_hash: str, ip_subnet: str) -> float:
        # Returns weight [0.0, 1.0]: 0 = drop, 1 = full count, 0.1 = new account
        now = time.time()

        # Check user velocity
        self._user_counts[user_id] = self._prune_window(self._user_counts[user_id], now)
        if len(self._user_counts[user_id]) >= self._user_rate_limit:
            return 0.0
        self._user_counts[user_id].append(now)

        # Check duplicate content
        if self._content_hashes[content_hash] >= self._dupe_text_limit:
            return 0.0
        self._content_hashes[content_hash] += 1

        # Check subnet rate
        self._subnet_counts[ip_subnet] = self._prune_window(self._subnet_counts[ip_subnet], now)
        if len(self._subnet_counts[ip_subnet]) >= self._subnet_rate_limit:
            return 0.2  # throttle, don't drop entirely
        self._subnet_counts[ip_subnet].append(now)

        return 1.0
Watch Out

Bot filtering must happen before the Count-Min Sketch increment, not after. If you count first and filter later, you cannot subtract from a CMS - the data structure only supports increment. Any architecture that plans to “post-process” trends for bot removal will find that the correction step requires knowing which sketched tokens came from bots - information that the sketch discards by design.

Velocity Scoring and Trend Decay

The velocity scorer’s job is to distinguish a genuinely emerging trend from a topic that has been consistently popular. “Good morning” trends every morning. “Breaking news” is always popular. These are not trends - they are noise.

Velocity is defined as the ratio of recent count to historical baseline. If a topic appeared 100 times in each of the last 24 hours (baseline = 100/hour) but has appeared 8,000 times in the last 5 minutes (velocity = 96,000/hour), its velocity ratio is 960x. That’s a real trend. A topic appearing 500 times per hour consistently has velocity ratio 1.0 - not a trend.

Trend decay prevents old trends from lingering in the list after the conversation has moved on. We apply an exponential decay function to the score: score = velocity_ratio * e^(-lambda * age_minutes). With lambda = 0.15, a topic that peaked 10 minutes ago has its score reduced to e^(-1.5) = 22% of its original. After 30 minutes without fresh velocity, any topic falls below the threshold.

# Velocity scorer with exponential trend decay
import math
from dataclasses import dataclass
from typing import Optional

@dataclass
class TrendCandidate:
    token: str
    current_count: int      # count in latest 5-min window
    baseline_count: float   # rolling 24h average per 5-min window
    first_seen_ts: float    # epoch when this topic first entered top-K
    last_peak_ts: float     # epoch of highest recorded velocity

LAMBDA = 0.15               # decay constant (units: per minute)
VELOCITY_FLOOR = 50         # ignore tokens with fewer than 50 mentions - reduces noise
VELOCITY_THRESHOLD = 5.0    # must be 5x above baseline to qualify as trending

def compute_trend_score(candidate: TrendCandidate, now: float) -> Optional[float]:
    # Returns None if token should not appear in trends
    if candidate.current_count < VELOCITY_FLOOR:
        return None

    # Velocity ratio vs historical baseline
    baseline = max(candidate.baseline_count, 1.0)  # avoid division by zero
    velocity_ratio = candidate.current_count / baseline

    if velocity_ratio < VELOCITY_THRESHOLD:
        return None  # not trending - just popular

    # Time since peak in minutes
    age_minutes = (now - candidate.last_peak_ts) / 60.0

    # Apply exponential decay
    decayed_score = velocity_ratio * math.exp(-LAMBDA * age_minutes)

    # Dampen very high volumes to prevent mega-events from dominating forever
    # log scaling means a 10,000x event doesn't completely shadow a 100x event
    volume_dampened = math.log1p(candidate.current_count) * decayed_score

    return volume_dampened

def update_baseline(old_baseline: float, new_window_count: int, alpha: float = 0.05) -> float:
    # Exponential moving average for baseline - slow to change (alpha=0.05)
    return alpha * new_window_count + (1 - alpha) * old_baseline
Real World

Twitter’s actual trending algorithm uses a variant of this velocity approach combined with account-level trustworthiness scores. A 2019 blog post revealed that Twitter weights tweets from accounts with high follower counts and low bot-probability scores more heavily. The velocity-over-baseline approach prevents “Tuesday” and “coffee” from ever appearing in trending despite their astronomical raw mention counts.

Data Model

The pipeline uses two main storage schemas: one for the raw event stream (Kafka, transient) and one for the computed trend results (Redis + Postgres for audit).

-- Postgres schema for trend audit and historical analysis
-- tweet_token_events is not stored at full resolution - only aggregated checkpoints

CREATE TABLE trend_checkpoints (
  id            BIGSERIAL PRIMARY KEY,
  region_id     VARCHAR(32)     NOT NULL,
  window_end_ts TIMESTAMPTZ     NOT NULL,
  token         VARCHAR(280)    NOT NULL,
  count_estimate BIGINT         NOT NULL,
  velocity_ratio FLOAT          NOT NULL,
  trend_score   FLOAT           NOT NULL,
  rank_position SMALLINT        NOT NULL,
  created_at    TIMESTAMPTZ     NOT NULL DEFAULT NOW()
);

-- Partitioned by window_end_ts for efficient range queries
CREATE INDEX idx_trend_checkpoints_region_time
  ON trend_checkpoints (region_id, window_end_ts DESC);

CREATE INDEX idx_trend_checkpoints_token_time
  ON trend_checkpoints (token, window_end_ts DESC);

-- Current live trends per region (kept in sync with Redis via CDC)
CREATE TABLE live_trends (
  region_id      VARCHAR(32)     NOT NULL,
  rank_position  SMALLINT        NOT NULL,
  token          TEXT            NOT NULL,
  trend_score    FLOAT           NOT NULL,
  velocity_ratio FLOAT           NOT NULL,
  count_estimate BIGINT          NOT NULL,
  refreshed_at   TIMESTAMPTZ     NOT NULL,
  PRIMARY KEY (region_id, rank_position)
);

-- Bot detection signals (lightweight - not storing all tweets)
CREATE TABLE bot_signals (
  user_id        VARCHAR(64)     NOT NULL,
  signal_type    VARCHAR(32)     NOT NULL,   -- 'velocity', 'new_account', 'dupe_content'
  detected_at    TIMESTAMPTZ     NOT NULL,
  suppressed_until TIMESTAMPTZ   NOT NULL,
  PRIMARY KEY (user_id, signal_type)
);
CREATE INDEX idx_bot_signals_suppressed ON bot_signals (user_id, suppressed_until);

The Redis data model is the hot path. Trends are stored as sorted sets where the score is the trend_score float and the member is the token string. This enables ZREVRANGE trends:us-east 0 29 to return the top-30 trends in O(log(N) + M) time.

# Redis trend storage and retrieval
# Uses Redis sorted set: ZADD trends:{region} score member

import redis
from typing import List, Tuple

r = redis.Redis(host="redis-cluster", port=6379, decode_responses=True)

TREND_KEY_PREFIX = "trends"
TREND_TTL_SECONDS = 120  # expire after 2 refresh cycles if no update

def publish_trends(region_id: str, scored_tokens: List[Tuple[str, float]]) -> None:
    # scored_tokens: list of (token, score) pairs
    key = f"{TREND_KEY_PREFIX}:{region_id}"
    pipe = r.pipeline()
    pipe.delete(key)
    # ZADD with score-member pairs
    if scored_tokens:
        mapping = {token: score for token, score in scored_tokens}
        pipe.zadd(key, mapping)
    pipe.expire(key, TREND_TTL_SECONDS)
    pipe.execute()

def get_top_trends(region_id: str, count: int = 30) -> List[Tuple[str, float]]:
    key = f"{TREND_KEY_PREFIX}:{region_id}"
    # ZREVRANGEBYSCORE returns highest scores first
    results = r.zrevrange(key, 0, count - 1, withscores=True)
    return [(token, score) for token, score in results]
Data flow diagram showing a single tweet's path from arrival through bot filtering, Count-Min Sketch update, window emission, and Redis write

The data flow for a single tweet is deterministic: arrive in Kafka, consumed by the Flink worker assigned to that partition, bot-filtered inline, tokens extracted, Count-Min Sketch incremented, and at each 60-second boundary, the top-K list is emitted and written to Redis. No tweet is ever stored individually - the system is fundamentally stateless at the record level and stateful only at the aggregate level.

Key Algorithms and Protocols

Top-K Approximation with Min-Heap

Once the Count-Min Sketch can answer “what is the estimated count for token T?”, we still need to find the top-30 tokens without querying every possible token. The standard approach is a min-heap of size K maintained as tokens stream in.

The heap invariant: the heap always contains at most K tokens, and the token with the smallest score is at the top (min-heap, not max-heap). When a new token arrives with count C: if C is greater than the minimum in the heap, we pop the minimum and push the new token. At the end of each window, the heap contains an approximation of the top-K.

# Top-K tracker using a min-heap, O(N log K) total for N tokens
import heapq
from typing import List, Tuple

class TopKTracker:
    def __init__(self, k: int = 50):
        # We track k=50 internally, report top 30 externally
        # Over-provisioning k reduces the chance of missing a true top-30 entry
        self.k = k
        self._heap: List[Tuple[float, str]] = []  # (score, token)
        self._token_scores: dict = {}

    def update(self, token: str, score: float) -> None:
        if token in self._token_scores:
            # Token already in heap - update score (remove and re-add)
            old_score = self._token_scores[token]
            if old_score == score:
                return
            # Mark old entry as invalid (lazy deletion)
            self._token_scores[token] = score
            heapq.heappush(self._heap, (score, token))
        elif len(self._heap) < self.k:
            heapq.heappush(self._heap, (score, token))
            self._token_scores[token] = score
        elif score > self._heap[0][0]:
            # New token beats the current minimum - evict and insert
            _, evicted = heapq.heapreplace(self._heap, (score, token))
            if evicted in self._token_scores:
                del self._token_scores[evicted]
            self._token_scores[token] = score

    def top_k(self) -> List[Tuple[str, float]]:
        # Return top-k sorted descending, filtering stale heap entries
        valid = [
            (score, token) for score, token in self._heap
            if token in self._token_scores and self._token_scores[token] == score
        ]
        valid.sort(reverse=True)
        return [(token, score) for score, token in valid[:self.k]]
Key Insight

The combination of Count-Min Sketch and min-heap is a two-stage filter. The sketch answers “how often has this token appeared?” in constant space. The heap answers “which tokens appeared most often?” in O(log K) per update. Neither structure alone solves the problem: the sketch cannot find top-K without querying all tokens, and a heap without the sketch would require exact counts that consume unbounded memory.

Geo-Partitioned Trend Merging

When multiple Flink workers process the same region’s Kafka partitions, each worker has a partial view of that region’s traffic. The Regional Aggregator must merge these partial views correctly.

The merging protocol is additive: Count-Min Sketches from different workers covering the same time window can be merged by summing the corresponding cells. This is because the hash functions are identical across workers (seeded identically), so h_i(token) maps to the same cell in every worker’s sketch. Summing cell values gives the same result as if a single sketch had processed all events.

# Regional aggregation: merge worker sketches and extract top-K
from typing import List

def merge_regional_sketches(
    worker_sketches: List[CountMinSketch],
    candidate_tokens: List[str],
    k: int = 50,
) -> List[Tuple[str, int]]:
    # Merge all worker sketches into one
    merged = CountMinSketch(depth=4, width=50_000)
    for sketch in worker_sketches:
        merged.merge(sketch)

    # Query merged sketch for each candidate token
    # candidate_tokens comes from each worker's local top-K heap
    scored = [
        (token, merged.query(token))
        for token in set(candidate_tokens)
    ]
    scored.sort(key=lambda x: x[1], reverse=True)
    return scored[:k]

The set of candidate_tokens is the union of top-K lists from all workers. Each worker emits its local top-50. The union typically contains 200-400 distinct tokens. Querying the merged sketch for each is O(200 * depth) = O(800) operations - negligible.

Real World

Apache Flink’s native operator state and the Mergeable state interface make CMS merging straightforward in production. Flink’s Watermark API handles the window timing - each worker emits results when its event-time watermark crosses the 60-second boundary, which accounts for late-arriving tweets without holding the window open indefinitely.

Scaling and Performance

Geo-partitioned scaling diagram showing Kafka partitions, Flink worker pools, regional aggregators, and Redis shards by region

The pipeline scales horizontally at every layer. The critical bottleneck is not the Count-Min Sketch update (O(d) = O(4) per token) - it’s the Kafka consumer throughput and the window emit frequency.

Kafka throughput: At 500,000 tweets/second with 8 tokens each, the pipeline generates 4 million token events per second. With 120 Kafka partitions and 6 regions, each partition handles approximately 33,000 events per second. A single Kafka consumer can easily handle 100,000 messages per second, so Flink workers are never the bottleneck at the ingest side.

Flink window state: Each worker maintains a SlidingWindowSketch with 30 micro-buckets, each containing one Count-Min Sketch. Memory: 30 * 4 * 50,000 * 4 bytes = 24MB per region per worker. With 20 regions and 5 workers per region = 100 workers total, total sketch memory across the fleet is 100 * 24MB = 2.4GB. This fits comfortably within a 40-node Flink cluster with 4GB RAM per node.

Capacity estimation back-of-envelope:

# Twitter Trending Pipeline - Capacity Estimation

Given:
  - 500,000 tweets/second peak ingest
  - 8 tokens per tweet average
  - 20 geographic regions
  - 60-second refresh cycle
  - 5-minute sliding window (30 micro-buckets of 10s each)

Token events:
  500,000 tweets/sec * 8 tokens = 4,000,000 token events/sec

Kafka sizing:
  4M events/sec at 200 bytes each = 800 MB/sec write throughput
  With 3x replication: 2.4 GB/sec total Kafka I/O
  Kafka partitions needed: 4M events / 100k events per partition = 40 partitions minimum
  We use 120 partitions (20 per region * 6 regions) for headroom

Flink memory per worker:
  CMS per bucket: 4 rows * 50,000 cols * 4 bytes = 800 KB
  Buckets per window: 30
  Total per worker: 30 * 800 KB = 24 MB per region
  Workers per region: 4-12 (auto-scaled)
  Total fleet: ~80 workers * 24 MB = 1.92 GB sketch memory

Redis sizing:
  30 trends per region * ~50 bytes per trend = 1.5 KB per region
  20 regions + 1 worldwide: 21 keys * 1.5 KB = ~32 KB total
  Redis cluster: 3 nodes with 100 MB each (massively overprovisioned - reads dominate)

Redis reads:
  1 billion users / 60 second refresh = ~16 million reads/sec
  With CDN edge cache (30s TTL): 16M / 30 = ~530,000 CDN cache misses/sec
  Redis only serves ~1,000 req/sec after CDN absorption (1 per CDN PoP per 30s)

API serving:
  Trends served via CDN: cache hit rate ~99.9% after warm-up
  Effective Redis QPS: <2,000 (trivially handled by 3-node cluster)

The CDN absorption ratio is the scaling superpower here. The trend list for a region changes once every 60 seconds. With a 30-second CDN TTL, each of the ~500 global CDN PoPs makes at most 2 origin requests per minute per region. That’s 21 regions * 500 PoPs * 2 req/min = 21,000 origin requests per minute, or 350 requests per second total to the Trends API - regardless of how many users are viewing trends.

Real World

The CDN absorption pattern is exactly what Twitter uses for trending. The Twitter API documentation shows that the GET trends/place endpoint is cached aggressively at the edge with a Cache-Control: max-age=300 header, meaning clients see trends that can be up to 5 minutes stale. The pipeline runs faster internally, but the CDN acts as a rate limiter protecting the Redis cluster from 300 million client polling attempts.

Failure Modes and Recovery

FailureDetectionImpactRecovery
Flink worker crashFlink job manager heartbeat timeout (10s)Partial region coverage until restart; that region’s Kafka partitions lagFlink restores worker from last checkpoint (every 30s); partition consumer resumes from committed offset; up to 30s of window state lost
Kafka broker failureISR (in-sync replica) shrinkage alertReduced Kafka throughput; some partitions temporarily unavailableKafka replication takes over from another replica; consumers reconnect automatically; min.insync.replicas=2 ensures no data loss
Redis primary failureRedis Sentinel health check failure (5s threshold)Trend reads fail or return stale cached data; 5s of write lossRedis Sentinel promotes replica to primary within 10-30s; CDN serves stale data during the gap (acceptable - trends do not need sub-second freshness)
Regional aggregator crashMissing 60-second emit from regionThat region’s trends freeze at last valid snapshotAggregator is stateless (workers retain sketch state); restart in ~5s; next cycle resumes normally
Bot filter false positives (over-suppression)Trend diversity metric drops below threshold (fewer than 10 distinct trending topics)Real trends suppressed along with bot activityBot filter thresholds are per-region dynamic; if diversity drops, velocity/account-age thresholds are relaxed automatically by 20%
Clock skew between Flink workersWindow boundaries misalign; some events counted in wrong bucketMinor count inaccuracy at window boundary (~1-2% error for skews under 1 second)Flink uses event-time watermarks tied to tweet creation timestamp, not wall clock; skew tolerance set to 5 seconds via allowedLateness
Watch Out

The most common production failure mode is not a crash - it is silent drift. A Flink worker that is healthy but running 90 seconds behind its Kafka partition is invisible to health checks but contributes stale data to the region’s top-K. The fix is to monitor Kafka consumer group lag per partition and alert if any partition exceeds 30 seconds of lag, not just if the worker process is alive.

Comparison of Approaches

ApproachLatencyMemoryAccuracyFailure Mode
Exact count in Redis (HINCRBY per token)Low (ms per increment)Unbounded - grows with unique token count (10M tokens/day)ExactRedis OOM under cardinality explosion; bot inflation cannot be post-corrected
Count-Min Sketch + Sliding Window (this design)Low (O(d) per token)Fixed 800KB per region per sketch0.5% overcount errorNone from counting itself; correctness bounded by error guarantee
Lambda architecture (batch + speed layer)Speed layer: 60s; Batch reconcile: hoursTwo full copies of data; complex opsExact after batch reconcileBatch job failures cause trend data to diverge from speed layer silently
Full scan with Spark Structured Streaming5-15 minutes per cycleProportional to tweet volume in window (TBs)ExactToo slow for 60-second SLA; cannot absorb 500k tweets/sec in micro-batch mode
HyperLogLog per tokenLowVery small (12KB per token)Counts distinct users, not total mentionsHLL counts distinct users, not mentions - useful for reach metric, wrong for trending
Pre-sharded exact counters (one Redis key per token per region)LowLinear in distinct tokensExactRequires enumerating all possible tokens in advance; impractical for open-ended hashtags

The Count-Min Sketch approach wins on every axis that matters for trending. The 0.5% error is undetectable to users - nobody can tell if a topic appeared 8,940 versus 9,000 times. The fixed memory budget means no capacity planning surprises. The additive merge property makes multi-worker aggregation trivially correct.

The only scenario where exact counting wins is in legal or regulatory contexts where precise counts must be defensible. For trend detection, approximate is not just acceptable - it is the right engineering choice.

Key Takeaways

  • Count-Min Sketch solves the counting problem in fixed memory by trading a bounded, configurable error for unbounded cardinality support - with d=4, w=50,000, error stays under 0.5% regardless of tweet volume.
  • Sliding window buckets (not a single global counter) enable eviction without subtraction, which is critical for a data structure that only supports increment.
  • Velocity over volume is the core algorithmic insight: trending requires the rate of change relative to baseline, not raw mention count, to filter out chronically popular topics.
  • Trend decay with exponential smoothing prevents the trending list from being dominated by a single viral event for hours after the conversation has moved on.
  • Bot filtering before counting is architecturally mandatory, not optional post-processing, because Count-Min Sketch increments cannot be reversed.
  • Geo-partitioned Kafka + Flink workers achieve regional independence without sharing state, enabling each region to scale independently based on its traffic volume.
  • CDN absorption reduces Redis read pressure by four orders of magnitude - from millions of client reads per second to hundreds of origin reads per second.
  • Mergeable sketch state allows horizontal scaling of workers within a region: each worker processes a subset of tweets, and their sketches are combined additively at aggregation time.

The counter-intuitive lesson here is that introducing approximation - replacing exact counts with a probabilistic data structure - actually makes the system more correct in practice. An exact counter that can be inflated by bots is less accurate than an approximate counter that filters bots before counting. Precision in the data model does not equal accuracy in the result.

Frequently Asked Questions

Q: Why not use HyperLogLog instead of Count-Min Sketch?

A: HyperLogLog counts distinct elements (cardinality), not total occurrences. PFADD tweet "#WorldCup" would tell you how many distinct users mentioned #WorldCup, not how many times it was mentioned. For trending, total mention velocity matters more than unique user reach. A single user tweeting a hashtag 100 times would barely register in HLL but would correctly signal high velocity in a CMS. Use HLL when you care about reach; use CMS when you care about frequency.

Q: Why not just use a simple hash map per region and periodically evict old keys?

A: At 500,000 tweets per second with 8 tokens each, a full day of trending activity generates roughly 345 billion token events. Even with aggressive deduplication, the number of unique tokens observed per day is in the tens of millions. A hash map storing 10 million token-count pairs at 100 bytes each is 1GB per region per 5-minute window. With 20 regions and a 5-minute sliding window, that’s 20GB of counter state in memory. Count-Min Sketch handles the same workload in 20 * 24MB = 480MB total.

Q: How does the system handle a breaking news event that generates 100x normal tweet volume in a single region?

A: The Count-Min Sketch is robust to volume spikes because its error is proportional to total volume, and at 100x volume the absolute error is 100x higher but still represents less than 1% of the spike’s count. The velocity scorer naturally amplifies breaking news signals: a 100x spike produces a velocity ratio far above the threshold of 5x, so the topic rises immediately to position 1 in the regional trends. The min-heap is updated continuously, so the topic can appear in trends within the first 60-second window after the event begins.

Q: Why not use Spark Streaming instead of Flink?

A: Spark Structured Streaming uses micro-batches with a minimum latency of a few seconds per batch. For a 60-second trend refresh cycle this sounds acceptable, but at 4 million token events per second, each micro-batch contains millions of events. Processing time for a micro-batch can approach or exceed 60 seconds under load, creating a feedback loop where processing falls behind. Flink’s event-by-event processing model handles the same throughput with sub-second per-event latency by maintaining in-memory sketch state rather than materializing each micro-batch to disk. The Flink model is the right abstraction for continuous aggregation over high-velocity streams.

Q: How do you prevent “Good Morning” from trending every day at the same time?

A: The baseline EMA (exponential moving average) updates slowly (alpha=0.05). After 30 days, “Good Morning” has a baseline of several hundred thousand mentions per 5-minute morning window. Its velocity ratio at the regular morning peak is approximately 1.0 - exactly at baseline. Only a day where “Good Morning” trends dramatically above its normal peak would trigger the velocity threshold. The VELOCITY_THRESHOLD = 5.0 constant means the topic must appear at 5x its historical rate to qualify as trending.

Q: What happens to regions with very low tweet volume, like smaller countries?

A: Small regions would produce sparse trends with low confidence if computed independently. The system handles this through region grouping: countries with fewer than 10,000 tweets per minute are grouped into a sub-regional bucket (e.g., “Central Africa”, “Scandinavia”). The aggregator for these buckets combines sketches from multiple countries. When a country grows above the threshold, it graduates to its own regional bucket. This prevents a single viral tweet in a low-volume region from appearing as a “trend” based on 20 mentions.

Interview Questions

Q: Walk me through how you’d detect an emerging trend within 60 seconds without storing individual tweets.

Expected depth: Describe the Count-Min Sketch increment path, explain why fixed-width hash rows provide an error bound, connect the sliding window bucket eviction to why no full history replay is needed, and mention that the top-K heap runs in O(log K) per candidate so the total emission path at 60 seconds is O(candidates * d) for sketch queries plus O(candidates * log K) for heap updates.

Q: A coordinated bot campaign is generating 200,000 fake tweets per second all mentioning the same hashtag. How does your system handle this, and what are the limits of your bot filtering?

Expected depth: Walk through each bot filter check in order (velocity, account age, content deduplication, subnet rate), explain which check triggers first for a coordinated campaign (likely velocity + subnet rate for a botnet, or content dedup for a single-message flood), acknowledge that sophisticated bots with diverse content and aged accounts can evade the filter, and describe the monitoring signal (sudden single-topic dominance with uniform velocity across accounts) that would alert a human reviewer.

Q: How would you extend this system to support per-user personalized trends instead of region-only trends?

Expected depth: Discuss the cardinality explosion problem - you cannot maintain a Count-Min Sketch per user (300 million users * 24MB = 7.2PB of memory). Describe a two-stage approach: keep the regional trend pipeline as-is, then apply a personalization re-ranking layer at serve time that uses the user’s interest graph (topics followed, recently engaged hashtags) to reorder the regional top-30. The re-ranking is cheap (30 items) and personalizes without recomputing counts from scratch.

Q: How do you ensure that the trends list is consistent globally - that users in different regions see the same “worldwide” trends at the same time?

Expected depth: Explain that the worldwide aggregator consumes from all regional aggregators and emits on its own 60-second cycle. Discuss the problem that regional cycles may not align exactly (APAC emits at T=0, EU emits at T=3s, US emits at T=7s). Describe the use of a fixed wall-clock emission schedule (all aggregators emit at 0, 60, 120 seconds past the minute) rather than relative timing to ensure the worldwide aggregator always has fresh inputs before it emits. The Redis write for worldwide trends includes a refreshed_at timestamp so clients can detect stale data.

Q: What is the back-of-envelope calculation for the minimum number of Kafka partitions needed, and how does partition count affect tail latency?

Expected depth: Walk through the math: 4M token events/sec, each Flink consumer handles 100k-200k events/sec, so minimum 20-40 partitions. Explain that more partitions reduce per-partition backpressure during traffic spikes, but add coordinator overhead in Kafka (each partition has a leader broker, replication traffic, and offset tracking). Discuss the sweet spot of over-provisioning by 3-4x (120 partitions for a theoretical minimum of 30) to handle 4x traffic spikes without repartitioning. Mention that partition count cannot be changed without data migration in Kafka, so over-provisioning at launch is cheaper than repartitioning later.

Premium Content

Unlock the full article along with everything else in the archive — all in one place.

In-depth analysis Expert insights Full archive access
Unlock Full Article