Build Twitter's At-Mention Fan-Out for Celebrities


scalability distributed-systems performance

System Design Deep Dive

Celebrity Mention Fan-Out

100 million notifications from a single tweet - without a write storm that destroys your database

⏱ 14 min read📐 Advanced🏗️ Fanout

Imagine a fire station that receives one call and must immediately dispatch ambulances to every house in a city of 100 million residents - simultaneously, without clogging every road, and with the most urgent addresses reached first. That is the engineering problem when a celebrity with 100 million followers gets mentioned in a tweet. One write event from a single API call must propagate into potentially 100 million notification rows, 100 million cache entries, and 100 million push deliveries - all triggered in a matter of seconds.

The naive solution is obvious and catastrophically wrong: loop over the follower list and write a notification row to the database for every follower. At 100 million followers and 2 milliseconds per insert, that loop takes 55 hours. Even parallelized to 1,000 workers each doing 100,000 inserts, you are generating 100 million simultaneous database writes from a single event. Every one of those workers is holding a connection, competing for locks, and saturating your I/O throughput. This is the celebrity problem - a single logical event producing write amplification that is proportional to follower count.

The deeper tension is that the celebrity problem is not just about raw write volume. It is about the conflict between three forces that cannot all win at the same time. First, followers with 100 million people expect the notification to appear quickly - ideally within a few seconds, not hours. Second, the system must not sacrifice write throughput for all other users just because one celebrity posted something. Third, you cannot afford to store 100 million identical notification records for an event that 95% of those followers will never look at because they were asleep, inactive, or simply not interested. Write amplification math alone tells the story: if a celebrity posts once per hour and has 100 million followers, a naive push-to-all model generates 100 million writes per post - almost 28,000 writes per second just for one account.

Real systems like Twitter and Instagram handle this with a hybrid fanout approach: push eagerly to the small set of users who actually check their notifications frequently, and pull lazily for everyone else. The key insight is that follower counts are enormous but active user counts are manageable. We need to solve for write amplification, delivery latency SLA for active users, and graceful backpressure handling under celebrity post bursts - simultaneously.

Requirements and Constraints

Functional Requirements

  • When a celebrity is mentioned in a tweet, all followers who have notification preferences for mentions receive a notification
  • Notifications appear in the notification tab and as push alerts (APNs/FCM/Web Push)
  • Users can configure per-account notification preferences (follow all mentions, only mentions to me, etc.)
  • The system must track per-user read state so notification counts are accurate
  • Notifications must be durable - an infrastructure blip should not silently drop a notification

Non-Functional Requirements

  • Celebrity accounts defined as accounts with more than 1 million followers
  • Maximum follower count to handle: 200 million followers
  • Notification delivery latency for VIP users (last active within 24 hours): p95 under 5 seconds
  • Notification delivery latency for all other followers: p95 under 30 minutes
  • System must handle up to 500 celebrity mentions per second during peak events (award shows, elections)
  • Write throughput: up to 50 billion notification rows per day across all celebrities
  • Read throughput: 500,000 notification tab opens per second
  • Availability: 99.9% for push delivery, 99.99% for notification store reads
  • Storage: notification rows retained for 90 days before TTL-based deletion

Constraints and Assumptions

  • Follower lists for celebrities are pre-segmented and stored in a sharded index service, not fetched live during fanout
  • The is_vip_user flag is precomputed in a user tier service and cached in Redis
  • We are designing the notification fanout path only - the tweet creation path and social graph are separate services
  • Users who have not logged in within 30 days are excluded from fanout (their pull flag still gets set, but no write)
  • Notification preferences are cached at the fanout layer with a 60-second TTL

High-Level Architecture

The system has five major layers: the ingestion API, the celebrity router, the hybrid fanout coordinator, async notification workers, and the storage layer.

High-level architecture of celebrity mention fan-out system showing ingestion, routing, fanout, workers, and storage layers

The Mention API Gateway accepts incoming mention events from the tweet creation service. Each event carries the mentioned user’s ID, the mentioner’s ID, the tweet ID, and a timestamp. The gateway validates the event, deduplicates it using a short-window bloom filter to catch retry storms, and publishes it onto a Kafka topic partitioned by mentioned_user_id.

The Celebrity Router consumes from that Kafka topic and performs a follower-count lookup against the User Graph service. If the mentioned user has fewer than 1 million followers, the event is forwarded to a standard notification pipeline (a simple write queue). If they have more than 1 million followers, it is routed to the hybrid fanout coordinator. This single branching decision is the foundation of the entire design.

The Hybrid Fanout Coordinator is the heart of the system. It reads the celebrity’s follower shard index and makes two decisions simultaneously: which users get immediate push notifications (the VIP set - roughly the top 5% of active followers), and what Redis flag to set for the remaining 95% who will receive their notification lazily when they next open the notification tab. It then enqueues the VIP user IDs into a high-priority queue and enqueues shard task descriptors (not full user ID lists) into the bulk fanout queue.

The Async Notification Workers run in three pools: VIP workers that consume from the priority queue and write directly to the notification store plus trigger push delivery; shard workers that process bulk tasks in parallel, each handling one 100,000-follower shard; and a pull-model handler that watches the notification tab open events and resolves pending pull flags into actual notification rows on demand.

The Storage Layer consists of a Cassandra cluster for durable notification rows (partitioned by user ID), a Redis cluster for per-user feed caches and pull flags, a follower index service for paginating over celebrity follower lists, and the push delivery service (wrapping APNs/FCM/WebPush).

Key Insight

The entire system works because follower count and active follower count are orders of magnitude apart - a celebrity with 100 million followers typically has fewer than 5 million who check notifications daily. Routing only those 5 million through the push path reduces write amplification by 95% without degrading perceived notification quality.

The diagram below shows the end-to-end data flow from a single mention event to a notification appearing in a follower’s tab.

End-to-end data flow from celebrity mention event through Kafka, fanout coordinator, priority queues, workers, and into notification store

The Celebrity Router and Fanout Decision

The celebrity router is the traffic cop that prevents a 100-million-follower event from accidentally being processed by the standard single-user notification path.

Most engineers assume the routing decision should happen at the API layer. That instinct is wrong. Routing at the API layer means your ingestion service needs to know follower counts, and more importantly, it blocks the tweet creation path on a potentially slow follower-count lookup. Instead, we route asynchronously: the tweet creation service publishes the mention event onto Kafka, and a dedicated router consumer makes the routing decision off the critical path. Kafka’s ordered delivery per partition means the event still gets processed in order, just not synchronously.

Hybrid push-pull model decision tree showing VIP push path versus lazy pull path for celebrity mentions

The router performs two lookups. First, it checks a Redis cache keyed on celebrity_id for the follower count - this is an O(1) lookup with a 5-minute TTL. Second, for events that cross the celebrity threshold, it checks the mention preferences for the mentioned user (do they want all follower notifications, or only direct mentions?). Both lookups happen in under 10 milliseconds combined.

# Celebrity router: determines fanout strategy per mention event
import redis
import json
from dataclasses import dataclass
from enum import Enum

class FanoutStrategy(Enum):
    STANDARD = "standard"        # < 1M followers, write directly
    CELEBRITY_HYBRID = "hybrid"  # >= 1M followers, push VIPs + pull flag for rest

CELEBRITY_FOLLOWER_THRESHOLD = 1_000_000

@dataclass
class MentionEvent:
    mentioned_user_id: str
    mentioner_id: str
    tweet_id: str
    timestamp_ms: int

def classify_mention(event: MentionEvent, redis_client: redis.Redis) -> FanoutStrategy:
    cache_key = f"follower_count:{event.mentioned_user_id}"
    cached = redis_client.get(cache_key)

    if cached is not None:
        follower_count = int(cached)
    else:
        # Fallback to User Graph service gRPC call
        follower_count = fetch_follower_count_from_graph(event.mentioned_user_id)
        redis_client.setex(cache_key, 300, follower_count)  # 5-min TTL

    if follower_count >= CELEBRITY_FOLLOWER_THRESHOLD:
        return FanoutStrategy.CELEBRITY_HYBRID
    return FanoutStrategy.STANDARD

def route_mention_event(event: MentionEvent, redis_client: redis.Redis, kafka_producer) -> None:
    strategy = classify_mention(event, redis_client)

    if strategy == FanoutStrategy.STANDARD:
        kafka_producer.send(
            "notification-standard",
            key=event.mentioned_user_id.encode(),
            value=json.dumps(vars(event)).encode()
        )
    else:
        kafka_producer.send(
            "notification-celebrity-hybrid",
            key=event.mentioned_user_id.encode(),
            value=json.dumps({**vars(event), "strategy": strategy.value}).encode()
        )
Real World

Twitter’s original “Firehose” architecture in 2010-2012 used pure push fanout for all users. After the Justin Bieber problem - where a single tweet from Bieber would generate millions of near-simultaneous database writes - they introduced the hybrid model in 2013, documented in their “Fanout at Twitter” engineering blog. The follower-count threshold for switching to hybrid was a tunable parameter, set around 75,000 followers at the time.

The Hybrid Fanout Coordinator

The fanout coordinator is the most complex component in the system, and where most naive implementations introduce bugs.

The coordinator’s job is to split one incoming celebrity mention event into exactly the right set of downstream tasks - without over-producing work (writing to inactive users) or under-producing it (missing VIP users). Think of it like a postal sorting machine at a massive distribution center: one incoming parcel gets labeled, sorted, and dispatched to the right delivery truck based on destination zone and priority, rather than every postal worker trying to deliver every parcel themselves.

The coordinator reads the celebrity’s follower shard index in pages of 100,000 follower IDs at a time. For each page, it calls the User Tier service to identify VIP users within that batch - users who are currently online or were active within the past 24 hours. The VIP set from each page gets written to the priority queue as individual user IDs. The remaining IDs from each page get handled differently: instead of writing them to a queue, the coordinator writes a shard task descriptor that says “process followers at offset X through X+100,000 for celebrity Y’s mention event Z.” These task descriptors are tiny (a few hundred bytes each) compared to the 100,000 user IDs they represent.

# Fanout coordinator: partitions celebrity followers into VIP push vs pull shard tasks
import asyncio
from typing import AsyncIterator, List
from dataclasses import dataclass

@dataclass
class ShardTask:
    celebrity_id: str
    mention_event_id: str
    shard_offset: int    # start offset in follower list
    shard_size: int      # number of followers in this shard (typically 100K)
    tweet_id: str
    timestamp_ms: int

SHARD_SIZE = 100_000
VIP_ACTIVITY_WINDOW_HOURS = 24

async def coordinate_celebrity_fanout(
    event: MentionEvent,
    follower_index: FollowerIndexClient,
    user_tier: UserTierClient,
    priority_queue: QueueClient,
    shard_queue: QueueClient,
    redis_client: redis.Redis
) -> dict:
    vip_count = 0
    shard_task_count = 0
    offset = 0

    # Page through follower list in 100K chunks
    async for follower_batch in follower_index.paginate(
        user_id=event.mentioned_user_id,
        page_size=SHARD_SIZE
    ):
        # Identify VIP users in this batch (batch lookup, not per-user)
        vip_ids = await user_tier.filter_active_users(
            user_ids=follower_batch,
            active_within_hours=VIP_ACTIVITY_WINDOW_HOURS
        )

        if vip_ids:
            await priority_queue.enqueue_batch(
                queue="notification-vip",
                items=[{
                    "user_id": uid,
                    "tweet_id": event.tweet_id,
                    "mentioned_by": event.mentioner_id,
                    "ts": event.timestamp_ms
                } for uid in vip_ids],
                priority=0  # highest priority
            )
            vip_count += len(vip_ids)

        # Enqueue a shard task descriptor for the whole batch (not individual IDs)
        # Workers will re-read the follower index for this shard and write to Cassandra
        task = ShardTask(
            celebrity_id=event.mentioned_user_id,
            mention_event_id=f"{event.tweet_id}_{offset}",
            shard_offset=offset,
            shard_size=len(follower_batch),
            tweet_id=event.tweet_id,
            timestamp_ms=event.timestamp_ms
        )
        await shard_queue.enqueue(queue="notification-shard-tasks", item=vars(task))
        shard_task_count += 1
        offset += len(follower_batch)

    # Set pull flag so inactive users see the mention on next tab open
    # TTL of 90 days matches notification retention
    pull_flag_key = f"pull_mention:{event.mentioned_user_id}:{event.tweet_id}"
    redis_client.setex(pull_flag_key, 60 * 60 * 24 * 90, event.tweet_id)

    return {"vip_pushes_queued": vip_count, "shard_tasks_queued": shard_task_count}
Watch Out

A common mistake is to batch-fetch all 100 million follower IDs into coordinator memory before splitting into shard tasks. At 8 bytes per user ID, that is 800 MB per celebrity event just for the ID list. With 500 celebrity events per second at peak, you need 400 GB of RAM in your coordinator fleet. Page-based processing with shard task descriptors eliminates this entirely - coordinator memory stays constant regardless of follower count.

Async Notification Workers

Async notification workers are the engines that actually write notification records to durable storage - and they are where backpressure handling either saves or sinks the system.

Think of the worker pool as a factory assembly line. The shard queue is the conveyor belt, each shard task is a work unit arriving on that belt, and the workers are the stations processing units. If work arrives faster than workers can process it - say, 50 celebrity posts simultaneously at peak awards season - the conveyor belt (queue depth) grows. Without backpressure, the belt eventually overflows and tasks are dropped. With backpressure, the production rate is throttled at the source (the fanout coordinator slows down enqueuing P2 shard tasks) while the worker pool auto-scales to catch up.

Priority queue tiers and backpressure handling showing VIP, active, and bulk worker pools with queue depth monitoring

The worker pools are separated by priority tier precisely to prevent bulk fanout work from delaying VIP notifications. VIP workers run in an isolated pool of 20 processes that read exclusively from the P0 queue and are never preempted by bulk tasks. Shard workers read from the P2 queue and are elastically scaled based on queue consumer lag.

// Shard worker: processes one follower shard task, writing notification rows to Cassandra
package fanout

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "github.com/gocql/gocql"
)

type ShardWorker struct {
    cassandra     *gocql.Session
    followerIndex FollowerIndexClient
    batchSize     int
}

type NotificationRow struct {
    UserID      string
    TweetID     string
    MentionedBy string
    CreatedAt   time.Time
    IsRead      bool
}

func (w *ShardWorker) ProcessShardTask(ctx context.Context, task ShardTask) error {
    // Re-fetch the follower IDs for this specific shard from the index
    // This avoids storing 100K IDs in the task descriptor itself
    followerIDs, err := w.followerIndex.FetchShard(
        ctx,
        task.CelebrityID,
        task.ShardOffset,
        task.ShardSize,
    )
    if err != nil {
        return fmt.Errorf("fetch shard %d: %w", task.ShardOffset, err)
    }

    // Filter out users who have opted out of this notification type
    // Preference check uses a local cache with 60s TTL to avoid per-user DB calls
    activeFollowers := filterByPreferences(ctx, followerIDs, NotificationTypeMention)

    // Write to Cassandra in batches of 50 to control connection pressure
    // Cassandra partition key is user_id so each write goes to the right node
    for i := 0; i < len(activeFollowers); i += w.batchSize {
        end := i + w.batchSize
        if end > len(activeFollowers) {
            end = len(activeFollowers)
        }
        batch := w.cassandra.NewBatch(gocql.UnloggedBatch)
        for _, uid := range activeFollowers[i:end] {
            batch.Query(
                `INSERT INTO notifications (user_id, created_at, tweet_id, mentioned_by, is_read)
                 VALUES (?, ?, ?, ?, false) USING TTL 7776000`,
                uid,
                task.TimestampMs/1000,
                task.TweetID,
                task.MentionedBy,
            )
        }
        if err := w.cassandra.ExecuteBatch(batch); err != nil {
            // Log and continue - partial failure is acceptable; we retry at shard level
            return fmt.Errorf("cassandra batch write shard %d offset %d: %w",
                task.ShardOffset, i, err)
        }
    }
    return nil
}

The backpressure monitoring runs as a separate process that checks queue consumer lag every 10 seconds. When lag crosses a threshold (50,000 pending tasks), it does two things: it increments a Redis counter that the fanout coordinator checks before enqueuing new P2 shard tasks (causing it to slow down), and it triggers a Kubernetes HPA scale-out event to add more shard workers. VIP workers are never throttled by this mechanism.

# Backpressure monitor: throttles P2 enqueue when worker lag is too high
import time
from kafka import KafkaConsumer
import redis

SHARD_QUEUE_LAG_THRESHOLD = 50_000
BACKPRESSURE_KEY = "fanout_backpressure_active"
BACKPRESSURE_TTL_SECONDS = 30  # auto-expires if monitor crashes

def monitor_backpressure(kafka_admin_client, redis_client: redis.Redis) -> None:
    while True:
        consumer_group_offsets = kafka_admin_client.list_consumer_group_offsets(
            group_id="shard-fanout-workers"
        )
        end_offsets = kafka_admin_client.end_offsets(
            [tp for tp in consumer_group_offsets]
        )

        total_lag = sum(
            end_offsets[tp] - consumer_group_offsets[tp].offset
            for tp in consumer_group_offsets
        )

        if total_lag > SHARD_QUEUE_LAG_THRESHOLD:
            # Set backpressure flag with TTL - coordinator checks this before enqueuing P2
            redis_client.setex(BACKPRESSURE_KEY, BACKPRESSURE_TTL_SECONDS, 1)
            # Emit metric for auto-scaler to detect
            emit_metric("fanout.shard_queue.lag", total_lag)
        else:
            redis_client.delete(BACKPRESSURE_KEY)

        time.sleep(10)
Key Insight

Backpressure must flow upstream through every layer - if only the queue has backpressure but the coordinator keeps enqueuing, you just move the overflow point from the worker to the queue. The backpressure flag in Redis creates a feedback loop that reaches all the way back to the fanout coordinator, naturally throttling the entire P2 write path without any of the components needing to know about each other’s internals.

Follower List Partitioning

The follower index is the component most engineers underestimate. A 100-million-entry follower list for a single celebrity is not a simple SQL query - it is a dataset that needs to support high-throughput sequential scans during fanout while also supporting fast random lookups for “does user A follow celebrity B?”

Follower list partitioning strategy showing how 100M followers are split into 1000 shard tasks each processing 100K followers in parallel

The follower list for each celebrity is stored as a sorted set in a dedicated Cassandra table, partitioned by (celebrity_id, shard_number). Each shard holds at most 100,000 user IDs. The shard number is computed as floor(insertion_sequence / 100000) - when a new follower is added, they land in the current head shard. This gives us O(1) fan-out task generation: to generate 1,000 shard tasks for a 100-million-follower celebrity, we just query the metadata table for that celebrity’s shard count (which returns in milliseconds) and enqueue 1,000 task descriptors.

-- Schema for the follower index, partitioned for efficient shard scans
-- Each partition holds one shard of 100K followers for one celebrity

CREATE TABLE follower_index (
    celebrity_id   UUID,
    shard_number   INT,
    follower_id    UUID,
    followed_at    TIMESTAMP,
    PRIMARY KEY ((celebrity_id, shard_number), follower_id)
) WITH CLUSTERING ORDER BY (follower_id ASC)
  AND compaction = {'class': 'LeveledCompactionStrategy'}
  AND gc_grace_seconds = 86400;

-- Metadata table: stores shard count per celebrity for O(1) task generation
CREATE TABLE celebrity_follower_metadata (
    celebrity_id   UUID PRIMARY KEY,
    total_followers BIGINT,
    shard_count     INT,
    last_updated    TIMESTAMP
);

-- Index into VIP follower set (last-active users) - maintained separately
CREATE TABLE vip_follower_set (
    celebrity_id    UUID,
    follower_id     UUID,
    last_active_at  TIMESTAMP,
    PRIMARY KEY (celebrity_id, follower_id)
) WITH default_time_to_live = 604800;  -- 7-day TTL, updated on each login

The VIP follower set is a separate Cassandra table that contains only followers who have been active within the past 7 days. This table is maintained asynchronously: every time a user logs in or opens the app, their presence is recorded in this table for each celebrity they follow. The fanout coordinator reads from this table first (it is smaller and faster) and only falls back to the full follower shard scan for the non-VIP residual.

Real World

Instagram uses a nearly identical approach called “active follower index” - described in their 2019 engineering post on scaling notifications. They precompute a bloom filter over recently active followers per celebrity so the fanout coordinator can quickly identify the VIP set without a full table scan. The bloom filter fits in memory (around 1 MB for 5 million entries at 1% false positive rate) and cuts VIP identification latency to under 1 millisecond per page of followers.

Priority Queuing

Priority queuing in this system is not just a convenience feature - it is the mechanism that ensures a celebrity post never degrades the notification experience for ordinary users while also guaranteeing VIP followers get sub-second delivery.

The priority system has three tiers. P0 (VIP) contains notifications for celebrities and users with high follower counts - people whose notification feed is a product differentiator. P1 (active) contains notifications for users who have been active in the past 24 hours - the set of followers most likely to actually see the notification soon. P2 (bulk) contains everyone else, processed on best-effort basis limited to shard worker capacity.

Each tier maps to an isolated Kafka consumer group with a dedicated worker pool. The isolation is critical: a storm of P2 messages (say, 10 celebrities post simultaneously) cannot slow down P0 delivery because P0 workers never read from the P2 topic. The worker pools are sized proportionally to expected throughput and latency SLAs: 20 VIP workers for sub-second P0 delivery, 50 active workers for sub-5-second P1 delivery, and 100+ shard workers for P2 bulk processing.

# Priority queue producer: routes notification work to correct Kafka topic
from enum import IntEnum
import json

class NotificationPriority(IntEnum):
    VIP = 0       # celebrities, verified high-follower accounts
    ACTIVE = 1    # users active within 24h
    BULK = 2      # everyone else

PRIORITY_TOPIC_MAP = {
    NotificationPriority.VIP: "notification-fanout-p0",
    NotificationPriority.ACTIVE: "notification-fanout-p1",
    NotificationPriority.BULK: "notification-fanout-p2",
}

def enqueue_notification(
    kafka_producer,
    user_id: str,
    notification_payload: dict,
    priority: NotificationPriority,
    redis_client
) -> bool:
    # Check backpressure for P2 only - never throttle P0 or P1
    if priority == NotificationPriority.BULK:
        if redis_client.exists("fanout_backpressure_active"):
            # Drop P2 enqueue under backpressure - it will be caught by pull model
            return False

    topic = PRIORITY_TOPIC_MAP[priority]
    kafka_producer.send(
        topic,
        key=user_id.encode(),
        value=json.dumps({
            "user_id": user_id,
            "priority": int(priority),
            **notification_payload
        }).encode()
    )
    return True
Watch Out

A single Kafka topic with message priority headers is not equivalent to separate priority queues. Kafka processes messages in order within a partition - a flood of P2 messages will delay P0 messages even if P0 messages have a priority header. Separate topics with separate consumer groups are the only reliable way to enforce priority isolation in Kafka-based systems.

Data Model

The notification store uses Cassandra as the primary storage engine, with Redis for hot-path cache and pull flags.

-- Notifications table: primary store for all notification rows
-- Partition key is user_id so all notifications for a user hit one node
-- TTL of 7776000 seconds = 90 days

CREATE TABLE notifications (
    user_id       UUID,
    created_at    TIMESTAMP,
    notif_id      UUID,
    notif_type    TEXT,           -- 'mention', 'reply', 'retweet', 'like'
    actor_id      UUID,           -- who triggered this notification
    tweet_id      UUID,           -- source tweet
    is_read       BOOLEAN,
    PRIMARY KEY (user_id, created_at, notif_id)
) WITH CLUSTERING ORDER BY (created_at DESC, notif_id ASC)
  AND default_time_to_live = 7776000
  AND compaction = {'class': 'TimeWindowCompactionStrategy',
                    'compaction_window_unit': 'DAYS',
                    'compaction_window_size': 1};

-- Unread count cache: maintained as a Redis counter per user
-- Incremented on write, decremented on mark-read, reset to 0 on read-all
-- Key pattern: notif_unread:{user_id}

-- Pull flag store: signals that lazy-pull users should see new celebrity mentions
-- Key pattern: pull_mention:{celebrity_id}:{tweet_id}
-- TTL: 90 days - matches notification retention

-- VIP feed cache: pre-written notification list for VIP users
-- Key pattern: notif_feed:{user_id}
-- Value: JSON array of last 50 notification objects
-- TTL: 24 hours - refreshed on write

The data flow for a notification starts when a shard worker writes a row to Cassandra and increments the notif_unread:{user_id} counter in Redis. When the user opens the notification tab, the read path first fetches the cached count from Redis (millisecond latency), then reads the last 50 notification rows from Cassandra using the partition key and a DESC clustering order scan (this hits a single Cassandra node with a range scan, typically 5-10 ms).

For VIP users, there is an additional Redis cache layer: the last 50 notification objects are stored as a JSON list in notif_feed:{user_id}. The VIP worker writes to both Cassandra and this Redis cache simultaneously, so the VIP user’s first read is served from Redis and is sub-millisecond. The Redis entry TTLs after 24 hours, at which point the read path falls back to Cassandra.

Key Algorithms and Protocols

Write Amplification Math

Before choosing an architecture, we need to quantify the problem we’re solving. The write amplification factor for a celebrity fanout is simply notifications_generated / events_produced.

# Write amplification calculator - quantifies the cost of each fanout strategy
from dataclasses import dataclass

@dataclass
class FanoutCostModel:
    celebrity_follower_count: int = 100_000_000
    active_follower_fraction: float = 0.05    # 5% active in past 24h
    celebrity_posts_per_hour: float = 1.0
    notification_row_size_bytes: int = 128    # per row in Cassandra

    def pure_push_writes_per_event(self) -> int:
        """Naive: write a notification row for every follower."""
        return self.celebrity_follower_count

    def hybrid_writes_per_event(self) -> int:
        """Hybrid: push only to active followers, pull flag for rest."""
        active_count = int(self.celebrity_follower_count * self.active_follower_fraction)
        # Plus shard task descriptors (tiny, ~200 bytes each, not notification rows)
        shard_tasks = self.celebrity_follower_count // 100_000
        return active_count  # shard tasks don't count as notification writes

    def write_amplification_factor(self) -> float:
        return self.hybrid_writes_per_event() / self.celebrity_follower_count

    def daily_write_savings_gb(self) -> float:
        posts_per_day = self.celebrity_posts_per_hour * 24
        saved_writes = (
            self.pure_push_writes_per_event() - self.hybrid_writes_per_event()
        ) * posts_per_day
        return (saved_writes * self.notification_row_size_bytes) / (1024 ** 3)

model = FanoutCostModel()
print(f"Pure push writes/event:   {model.pure_push_writes_per_event():,}")
# Pure push writes/event:   100,000,000

print(f"Hybrid writes/event:      {model.hybrid_writes_per_event():,}")
# Hybrid writes/event:      5,000,000

print(f"Write amplification factor: {model.write_amplification_factor():.2f}")
# Write amplification factor: 0.05 (95% reduction)

print(f"Daily write savings:      {model.daily_write_savings_gb():.1f} GB")
# Daily write savings:      270.0 GB per celebrity per day

Follower Shard Scan Algorithm

The shard scan algorithm determines how quickly the fanout coordinator can page through all 100 million followers and generate shard tasks. It must be idempotent (safe to re-run if the coordinator crashes mid-scan) and efficient (no full table scans).

# Idempotent shard task generator with checkpoint recovery
import hashlib
from typing import Generator, Optional

class IdempotentShardGenerator:
    """
    Generates shard tasks for a celebrity mention event.
    Idempotent: if interrupted, resumes from the last committed shard.
    Uses a progress key in Redis to track completion.
    """
    def __init__(self, redis_client, follower_metadata_client, shard_queue):
        self.redis = redis_client
        self.metadata = follower_metadata_client
        self.shard_queue = shard_queue

    def _progress_key(self, event_id: str) -> str:
        return f"fanout_progress:{event_id}"

    def generate_tasks(
        self,
        celebrity_id: str,
        event_id: str,
        tweet_id: str,
        timestamp_ms: int
    ) -> dict:
        # Fetch shard count (O(1) metadata lookup, not a full scan)
        meta = self.metadata.get(celebrity_id)
        total_shards = meta.shard_count

        # Check if we are resuming from a checkpoint
        progress_key = self._progress_key(event_id)
        start_shard = int(self.redis.get(progress_key) or 0)

        tasks_created = 0
        for shard_num in range(start_shard, total_shards):
            task = {
                "celebrity_id": celebrity_id,
                "shard_number": shard_num,
                "tweet_id": tweet_id,
                "timestamp_ms": timestamp_ms,
                "event_id": event_id,
                # Unique task ID for deduplication at worker level
                "task_id": hashlib.sha256(
                    f"{event_id}:{shard_num}".encode()
                ).hexdigest()[:16]
            }
            self.shard_queue.enqueue("notification-shard-tasks", task)
            tasks_created += 1

            # Checkpoint every 50 shards so crash recovery only re-runs 50 tasks max
            if shard_num % 50 == 0:
                self.redis.setex(progress_key, 3600, shard_num + 1)

        # Clear progress key on completion
        self.redis.delete(progress_key)
        return {"total_shards": total_shards, "tasks_created": tasks_created}
Key Insight

Using a task ID derived from event_id + shard_number makes shard tasks inherently idempotent. If a worker crashes after processing half a shard and the task gets redelivered, the second execution will attempt duplicate Cassandra writes - but since the INSERT uses the same primary key (user_id, created_at, notif_id), duplicate writes are idempotent at the Cassandra level too. You get exactly-once semantics without a distributed transaction.

Scaling and Performance

Capacity Estimation

Given:
  - 500 celebrity mentions per second at peak (award shows, elections)
  - 100,000,000 average followers per celebrity mention
  - 5% active follower fraction = 5,000,000 VIP/active writes per event
  - 128 bytes per notification row
  - 1,000 shard tasks per celebrity mention (100K followers per shard)

Peak VIP writes:
  500 events/s x 5,000,000 writes/event = 2.5 billion writes/s (naive)
  BUT: VIP writes are distributed over 5 seconds of worker processing
  Effective VIP write rate: 500,000,000 writes/s peak across worker fleet

Shard task queue throughput:
  500 events/s x 1,000 tasks/event = 500,000 shard tasks/s
  Each task generates 100,000 Cassandra writes over ~30 seconds
  Peak Cassandra write rate: 500,000 tasks x 100,000 writes / 30s = 1.67 billion writes/s

Storage (90-day retention):
  5,000,000 VIP writes/event x 500 events/s x 86400 s/day x 90 days x 128 bytes
  = ~2.5 PB/quarter (VIP only)
  With full push-to-all: 25 PB/quarter (10x larger - confirms hybrid strategy is essential)

Kafka throughput (shard tasks):
  500,000 tasks/s x 1KB/task = 500 MB/s on shard task topic
  VIP queue: 500 events/s x 5,000,000 IDs/event x 64 bytes = 160 GB/s peak
  -> VIP queue needs aggressive batching: emit 1,000-ID batches, not individual records

Worker count estimate:
  Shard workers: 1 worker processes 1 shard (100K Cassandra writes) in ~10 seconds
  Peak: 500,000 tasks/s / (3600s backlog allowed) = need ~5,000 active workers
  Use Kubernetes HPA with Kafka consumer lag metric as scaling signal

Horizontal Scaling Strategy

The system scales horizontally at every layer. The Celebrity Router adds partitions to the Kafka input topic and adds router consumer instances. The Fanout Coordinator scales independently - each coordinator instance handles separate celebrity events from different Kafka partitions. The shard worker fleet uses Kubernetes HPA triggered by Kafka consumer lag: when lag exceeds 10,000 tasks, the fleet scales out by 10%; when lag drops to zero, it scales back in with a 5-minute cooldown.

The Cassandra notification store scales by adding nodes to the ring. Since the partition key is user_id, data distributes evenly across nodes and there are no hot partitions. The only exception is write thunderstorms from a single celebrity event: all 100 million notification rows get written over a 30-minute window, which is acceptable.

Real World

Slack’s 2020 engineering post on “Scaling Notifications” describes a nearly identical priority-queue architecture for their workspace notification system. They separate their notification worker fleet into three pools - a “real-time pool” for online users, a “push pool” for mobile push notifications, and a “bulk pool” for large workspace messages. Their backpressure mechanism uses a token bucket rate limiter at the enqueue layer rather than a Redis flag, which provides smoother throttling but adds latency to the measurement-action loop.

Failure Modes and Recovery

FailureDetectionImpactRecovery
Fanout coordinator crash mid-scanRedis progress key missing + Kafka consumer lag risingShard tasks stop being generated; followers won’t receive notificationsConsumer group rebalance triggers new coordinator instance; resumes from checkpoint shard number in Redis
Cassandra write failure for a shardWorker receives write exception; emits error metricUp to 100K users in that shard miss notificationShard task is returned to queue with retry_count + 1; max 3 retries before dead-letter
Priority queue consumer lag > 1M (Kafka topic full)Lag monitoring alert + backpressure flag firesP2 shard tasks start dropping; P0/P1 unaffectedScale out shard worker fleet; temporarily disable new P2 enqueues; existing tasks drain first
Celebrity Router crashes (no routing for 30s)Kafka consumer group heartbeat failure; lag on input topic risesNew celebrity mentions not classified; pile up in input topicKafka consumer group failover reassigns partitions to healthy instances within 10s
VIP feed cache corrupted in RedisCache read returns invalid JSON; falls back to CassandraVIP user notification tab load latency increases from 1ms to 10msCache TTL expires naturally (24h); or cache key deleted on read error; rebuilt from Cassandra
Push delivery provider (APNs) outageAPNs returns 5xx; push delivery worker increments failure counterMobile push notifications delayed; in-app notifications unaffectedRetry queue with exponential backoff + jitter; max 3 retries over 30 minutes; after that, in-app only
Watch Out

The most common operational mistake is disabling the pull model’s lazy-fetch path during incidents. When the shard worker fleet falls behind and P2 writes are delayed, the pull flag in Redis still exists - users who open their notification tab will see the correct notification because the lazy fetch hits Cassandra or falls back to the fan-out rerun. Disabling this fallback to “simplify the incident response” means those users see nothing, which is worse than a delayed notification.

Comparison of Approaches

ApproachWrite AmplificationDelivery LatencyOperational ComplexityBest Fit
Pure Push (write to all)Extreme - 100M writes per eventSub-second for all usersLow (simple loop)Small social networks, max 100K followers
Pure Pull (read on open)Zero writes on eventLatency on first tab openLowRead-heavy systems, no push notifications needed
Hybrid Push-Pull (this design)Low - 5M writes per event (VIP only)Sub-second for VIP, 30 min for bulkHigh - two separate fanout pathsLarge social platforms with celebrity accounts
Fan-out via Timeline ServiceMedium - write to timeline cache onlySub-second if cache warm, 1-2s coldMedium - timeline service manages complexityNews feed systems with ranked timelines (Facebook)
Event-Sourced Pull (kafka compacted log)Zero writes for notification rowsPull latency on open; no pushVery high - consumer position tracking per userAudit logs, activity streams, not real-time notifications

The hybrid approach is the right choice for this problem. Pure push is operationally simple but generates unacceptable write amplification at 100 million followers. Pure pull eliminates the write storm but sacrifices the push notification UX - users will not know they received a mention until they manually open the app. The hybrid model captures the best of both: active users get sub-second push delivery at a cost of 5 million writes per event (manageable), inactive users get zero-cost storage (the pull flag is 1 Redis key, not 95 million Cassandra rows), and the system degrades gracefully under load by falling back entirely to pull mode when the shard worker fleet is overwhelmed.

The one scenario where you would choose pure pull over hybrid is an enterprise notification system (like Slack or Microsoft Teams) where users are always at their desktops and the notification tab is always open. In that context, the pull-on-open latency is imperceptible and the operational simplicity of zero write amplification is worth the tradeoff.

Key Takeaways

  • Write amplification math drives every architecture decision - quantify writes_per_event before choosing a fanout strategy; at 100M followers, pure push generates 100M writes per tweet.
  • Hybrid fanout is the production-grade answer to the celebrity problem: push eagerly to the 5% of followers who are active, pull lazily for the 95% who are not, saving 95% of write volume.
  • Follower list partitioning into fixed-size shards (100K per shard) enables the fanout coordinator to generate O(N/100K) task descriptors instead of materializing O(N) user IDs in memory.
  • Priority queuing with isolated worker pools is essential - separate Kafka topics per priority tier prevent celebrity post storms from degrading notifications for ordinary users.
  • Backpressure must flow upstream through all layers - a Redis flag from the queue monitor to the fanout coordinator closes the feedback loop and prevents queue overflow during peak events.
  • Idempotent shard tasks derived from event_id + shard_number provide free at-least-once semantics at the worker level without distributed transactions, because Cassandra upserts are inherently idempotent on the same primary key.
  • Pull model as a safety net means the system degrades gracefully - even if P2 shard workers fall hours behind, users still see their notifications when they open the tab because the Redis pull flag persists for 90 days.
  • Async notification workers decouple delivery latency from write throughput - the tweet creation path returns in milliseconds while the fanout happens in the background over 30 minutes for the bulk case.

The counter-intuitive lesson from this design is that doing less work is the right answer. A system that writes 100 million rows per celebrity tweet looks like it is doing more to serve users - after all, every follower has their notification row ready instantly. But 95% of those users will never look at that row, and the 100 million writes per event are what makes the system brittle. The hybrid model does less work and is simultaneously more reliable, cheaper, and faster for the users who actually matter: the active ones.

Frequently Asked Questions

Q: Why use Cassandra for the notification store instead of Redis? A: Notification rows must persist for 90 days and survive Redis restarts. Redis is used as the hot-path cache (the last 50 notifications for VIP users) because it provides sub-millisecond reads, but the authoritative notification store needs durability guarantees that Redis AOF + RDB cannot reliably provide at this scale. Cassandra’s time-window compaction strategy is also ideal for TTL-heavy workloads - it efficiently reclaims space as notification rows age out without full compaction scans.

Q: Why not use a message queue like SQS or RabbitMQ instead of Kafka for the fanout queue? A: At 500,000 shard tasks per second peak, you need a queue that supports millions of messages per second with durable replay. Kafka’s log-based architecture allows shard workers to re-read tasks from any offset, which is critical for crash recovery and replayability. SQS visibility timeouts and RabbitMQ per-message ACK overhead do not scale to this throughput without extreme sharding complexity. Kafka also lets you set separate retention per topic - the P2 shard queue needs 7-day retention for recovery, which is trivial in Kafka but expensive in traditional queues.

Q: What happens if a celebrity gains 50 million followers overnight (viral growth)? A: The follower index uses a head-shard model - new followers always go into the current head shard, which is closed and a new head shard opened when it reaches 100,000 entries. The celebrity metadata table’s shard_count updates atomically when a new shard is created. The fanout coordinator always reads shard_count before generating tasks, so it sees the latest count. The shard worker fleet auto-scales on queue lag, so a sudden doubling of follower count roughly doubles the number of shard tasks, which triggers proportional scaling of the worker fleet within a few minutes.

Q: Why not deduplicate at the Kafka consumer level by just checking if a notification row already exists before writing? A: A read-before-write pattern at this scale is catastrophically expensive. Checking for existence of a notification row before writing it requires a Cassandra read per follower per event - at 100 million followers, that is 100 million reads per celebrity tweet before any writes happen. The read throughput alone would saturate the Cassandra cluster. Instead, we rely on Cassandra’s upsert semantics and idempotent INSERT behavior: the primary key constraint (user_id, created_at, notif_id) ensures that a duplicate write from a retried shard task produces the same row, not a duplicate row.

Q: How do you handle a celebrity who is mentioned in 100 tweets simultaneously during a live event? A: The fanout coordinator processes each mention event as an independent Kafka message. Because the coordinator is stateless and events are partitioned by celebrity_id, all 100 simultaneous events for the same celebrity land on the same Kafka partition and are processed sequentially by a single coordinator instance. This naturally serializes the shard task generation for that celebrity and prevents the coordinator from generating 100 x 1,000 = 100,000 duplicate shard tasks simultaneously. The first mention generates 1,000 tasks, then the second, and so on - each producing the correct distinct tweet-level notification rows.

Q: Why set a 30-day inactivity threshold for excluding users from P2 fanout? A: Users inactive for more than 30 days have a near-zero probability of opening their notification tab within the 90-day notification retention window. Writing 95 million notification rows for users who will never read them wastes Cassandra storage and write capacity. The pull flag in Redis is still set for inactive users, so if a dormant user returns after 60 days, they will see the mention on their first notification tab open via the lazy-fetch path. The 30-day threshold is tunable - platforms with higher re-engagement rates might lower it to 14 days; platforms with very low re-engagement might raise it to 7 days.

Interview Questions

Q: A celebrity with 100 million followers tweets for the first time in a year. Walk me through what happens in your system from the moment they post until all followers see the notification.

Expected depth: Explain the tweet creation path publishing a Kafka event, the celebrity router classifying the event by follower count, the hybrid fanout coordinator paging through the follower index in 100K shards, the VIP identification pass using the active follower set, separate enqueue of VIP user IDs to P0 queue and shard task descriptors to P2 queue, VIP workers writing to Cassandra and triggering APNs/FCM push, shard workers processing P2 tasks over 30 minutes, the pull flag in Redis enabling lazy notification for inactive users, and the 30-day inactivity exclusion rule.

Q: Your shard workers are falling behind during a peak awards event - the P2 queue has 50 million pending tasks. How does your system handle this and what tradeoffs does that involve?

Expected depth: Describe the backpressure monitor detecting queue lag exceeding the 50,000-task threshold, the Redis backpressure flag being set with a TTL, the fanout coordinator checking the flag before enqueuing new P2 tasks and dropping or delaying them, the Kubernetes HPA adding shard workers based on consumer lag metric, and the pull flag safety net ensuring users still see notifications on next tab open even if shard writes are delayed. Discuss the tradeoff: P2 delivery latency grows but P0/P1 are completely isolated.

Q: How would you modify this design if the requirement changed to guarantee every follower receives the notification within 60 seconds, even for celebrities with 100 million followers?

Expected depth: Calculate the required write throughput: 100M writes / 60s = 1.67M writes/second, which requires approximately 1,700 shard workers each writing 1,000 rows/second. Discuss Cassandra write scaling (partition by user_id gives linear scaling with nodes), worker fleet pre-warming, Kafka partition count tuning to match worker count, and the cost implications. Mention that this effectively abandons the hybrid model - you must push to all 100 million followers, which means accepting 100 million Cassandra writes per celebrity tweet.

Q: Describe how you would test this system for correctness. How do you verify that every follower eventually receives the notification?

Expected depth: End-to-end trace using a test celebrity account with 1,000 synthetic follower accounts at known activity levels (some VIP, some bulk, some inactive). Assert that VIP followers receive push notifications within 5 seconds, active followers within 5 minutes, and inactive followers see the notification on next simulated tab open. For production verification: a reconciliation job that runs hourly, selects a sample of 1,000 recent celebrity mention events, cross-joins them with follower lists, and checks that either a notification row exists in Cassandra or the pull flag is set in Redis for each follower. Discuss chaos testing: kill a shard worker mid-task and verify the task is re-queued and completed correctly (idempotency test).

Q: The pull model relies on a Redis flag that TTLs after 90 days. What happens to the user experience if Redis has an outage during a celebrity mention event?

Expected depth: VIP push is unaffected - it goes through Cassandra directly and the push delivery path does not depend on Redis for the flag. P2 bulk fanout shard tasks are still enqueued in Kafka and will write to Cassandra (also Redis-independent for the actual notification row). The only Redis-dependent path is setting the pull flag and reading it when a user opens the tab. If Redis is unavailable during flag write, the pull-model handler falls back to querying Cassandra directly for mentions from the past 90 days matching the user’s followed celebrities - a slower query (50-100ms vs 1ms) but functionally correct. Discuss Redis Cluster failover and the replication factor (3) that makes total Redis unavailability rare.

Premium Content

Unlock the full article along with everything else in the archive — all in one place.

In-depth analysis Expert insights Full archive access
Unlock Full Article