Build a Social Graph Service at 500M Users


databases distributed-systems performance

System Design Deep Dive

Social Graph Service at 500M Users

Sub-millisecond follow checks against a graph that mutates billions of times per day - where consistency and latency fight each other.

⏱ 14 min read📐 Advanced🏗️ Social Graph

Think of a social graph as an enormous phone book, except instead of listing names and numbers, each entry lists every person a user follows and every person who follows them. Now imagine that phone book has 500 million entries and is being edited billions of times every day - people following and unfollowing, celebrities gaining 10,000 new followers per hour - and your job is to answer “does Alice follow Bob?” in under one millisecond, every single time, no matter how hot that question gets.

That one-millisecond constraint is not arbitrary. It sits on the critical path of nearly every social product decision: showing a “Follow” button vs a “Following” badge, deciding whether to fan out a post to a user’s feed, determining mutual connections for a “People You May Know” feature, calculating notification eligibility. If your follow-check service is slow, every one of these product surfaces degrades. At 500 million users generating roughly 50 billion follow-check queries per day - that is around 580,000 queries per second at peak - you cannot afford even a single cache miss causing a database full-table scan.

The naive approach is a simple SQL table: INSERT INTO follows (follower_id, followee_id) and then SELECT 1 FROM follows WHERE follower_id = A AND followee_id = B. This works fine at 10,000 users. At 500 million users with 200 billion edges, that table is over a terabyte, the index for a single celebrity contains 100 million rows, and a warm cache miss becomes a multi-millisecond page read. The index on (follower_id, followee_id) must fit in RAM or you pay disk latency on every miss. Neither the index nor the data fits in a single machine’s RAM.

We need to solve three problems simultaneously: read latency (sub-1ms for the follow-check hot path), write throughput (billions of follow/unfollow events per day without blocking reads), and consistency (a user who just followed someone should see that reflected immediately, but global consistency after every write is prohibitively expensive). Every architectural decision in this post is a navigation of those three forces.

Requirements and Constraints

Functional Requirements

  • Answer “does user A follow user B?” with a boolean result
  • Return a user’s complete following list (who user A follows, paginated)
  • Return a user’s complete follower list (who follows user A, paginated)
  • Support mutual friend queries: who does both A and B follow?
  • Follow and unfollow operations with immediate read-your-writes consistency for the acting user
  • Support second-degree traversal for “People You May Know” recommendations

Non-Functional Requirements

  • Follow-check latency: p99 under 1ms, p50 under 0.3ms
  • Write throughput: 5 billion follow/unfollow events per day, sustained 60,000 writes/second at peak
  • Read throughput: 580,000 follow-check queries per second at peak
  • Availability: 99.99% (under 52 minutes downtime per year)
  • Read-your-writes: user sees their own follow/unfollow reflected within 100ms
  • Global consistency: eventual consistency acceptable, target 2 seconds for cross-region propagation
  • Scale: 500 million users, up to 500 billion total follow edges
  • Celebrity support: users with up to 200 million followers handled without hot-shard issues

Constraints and Assumptions

  • Follow relationships are directional (A follows B does not imply B follows A)
  • Maximum following list per user capped at 10,000 (Twitter-style hard limit)
  • Graph traversal depth capped at 2 hops for all product features
  • We deliberately exclude: content ranking, feed generation, notification delivery (separate systems)
  • Storage budget: $500K/year - this rules out graph databases like Neo4j at this scale
  • We assume users are partitioned by ID range, not geographically

High-Level Architecture

The system separates the read path from the write path entirely. Read traffic is orders of magnitude higher than write traffic (100:1 ratio), so optimizing them independently is not over-engineering - it is the only way to hit the sub-1ms SLA.

Social graph service architecture showing read path, write path, storage tier, and consistency layer

The API Gateway handles authentication, rate limiting, and routes requests to either the Follow Check Service (reads) or the Follow Write Service (writes). These are entirely separate service pools with different scaling policies - reads scale to hundreds of instances driven by CPU, writes scale based on queue depth.

The Follow Check Service implements a three-level lookup: an in-process bloom filter that gives a definitive “no” in 0.01ms, a Redis SISMEMBER call that answers in 0.1-0.3ms on a cache hit, and a shard lookup that answers in under 2ms on a cache miss. On cache miss it backfills Redis, so subsequent queries for the same pair are fast.

The Follow Write Service receives follow/unfollow requests, validates them, persists them to the write-ahead log, and publishes an event to Kafka. It does not touch Redis directly. Cache invalidation happens asynchronously through a CDC consumer that subscribes to the Kafka topic.

The Kafka Event Queue decouples write acknowledgment from storage propagation. The Follow Write Service acknowledges the write to the user as soon as the event is durably written to Kafka - the storage updates and cache invalidations happen asynchronously. This is the core of the read-your-writes design: the write service also updates the acting user’s own Redis cache synchronously before acknowledging, so the user who just clicked “Follow” sees the updated state immediately, even before the async propagation completes.

The Follower and Following Shards are separate shard groups, each containing a different view of the same edge: the Follower shards are keyed by followee_id (for “who follows user X?” queries), while the Following shards are keyed by follower_id (for “does A follow B?” and “who does A follow?” queries).

Key Insight

The most important architectural decision is storing every follow edge twice - once in the follower-keyed shards and once in the following-keyed shards. This doubles storage and write amplification but eliminates cross-shard joins on every read, which is the only way to hit the sub-1ms SLA at 500M users.

The Storage Layer: Adjacency Lists

Most engineers reach for a relational table or a graph database for social graph storage. Both are wrong at this scale for different reasons.

A pure graph database like Neo4j stores edges as first-class objects with full traversal capabilities. The problem is that Neo4j’s traversal performance degrades badly when a single node has tens of millions of edges - every celebrity account becomes a hotspot that saturates a single shard. Neo4j also lacks the horizontal scalability needed for 500 billion edges across 500 million nodes.

The right data structure for follow-check at this scale is the adjacency list stored in a columnar, sharded key-value store. Think of it like a sorted address book: each user gets one row, and that row contains a sorted array of all the user IDs they follow. A follow-check becomes a binary search into that sorted array - O(log n) where n is bounded by the 10,000 following limit, which means at most 14 comparisons regardless of how large the global graph grows.

-- Following adjacency table (keyed by follower_id)
-- One row per user, following_ids is a sorted int64 array
CREATE TABLE following_adjacency (
  follower_id     BIGINT NOT NULL,
  shard_id        SMALLINT NOT NULL GENERATED ALWAYS AS (follower_id % 256),
  following_ids   BIGINT[] NOT NULL DEFAULT '{}',
  following_count INT NOT NULL DEFAULT 0,
  updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  version         BIGINT NOT NULL DEFAULT 0,
  CONSTRAINT pk_following PRIMARY KEY (follower_id),
  CONSTRAINT max_following CHECK (following_count <= 10000)
);

-- Follower adjacency table (keyed by followee_id)
-- One row per user, follower_ids is a SORTED int64 array (for set intersection)
CREATE TABLE follower_adjacency (
  followee_id    BIGINT NOT NULL,
  shard_id       SMALLINT NOT NULL GENERATED ALWAYS AS (followee_id % 256),
  follower_ids   BIGINT[] NOT NULL DEFAULT '{}',
  follower_count INT NOT NULL DEFAULT 0,
  updated_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  version        BIGINT NOT NULL DEFAULT 0,
  CONSTRAINT pk_follower PRIMARY KEY (followee_id)
);

-- Celebrity overflow: users with 10M+ followers get paginated storage
CREATE TABLE celebrity_followers_page (
  followee_id  BIGINT NOT NULL,
  page_num     INT NOT NULL,
  follower_ids BIGINT[] NOT NULL,  -- up to 100K ids per page
  min_id       BIGINT NOT NULL,
  max_id       BIGINT NOT NULL,
  CONSTRAINT pk_celebrity_page PRIMARY KEY (followee_id, page_num)
);

-- Index for shard routing
CREATE INDEX idx_following_shard ON following_adjacency (shard_id, follower_id);
CREATE INDEX idx_follower_shard ON follower_adjacency (shard_id, followee_id);

The following_ids array is kept sorted at all times. Insertions use a binary search to find the insertion point and shift elements - this is O(n) for the array shift, but since n is capped at 10,000 it is bounded at about 10KB of data per row. Deletions are the same. At 60,000 writes per second, this is well within the bounds of modern SSDs.

Real World

Twitter’s social graph service (Flock) uses a similar adjacency list design with separate follower and following stores. Their “FlockDB” was built on MySQL with custom sharding, and they later migrated to a custom in-memory store for the hottest celebrity accounts. The key insight they published: for follow-check, two separate sorted arrays beat any graph traversal engine at scale.

Graph Partitioning

The graph has 500 million nodes and up to 500 billion directed edges. No single machine holds all of it. Partitioning strategy determines which queries can be answered locally versus which require cross-shard joins.

Follower and following shard split showing dual-write pattern and query routing

We use range-based sharding on user ID for both the follower and following tables, but the shard key is different for each table. The following table shards on follower_id so all queries about “who does user A follow” go to one shard. The follower table shards on followee_id so all queries about “who follows user B” go to one shard. A follow-check query “does A follow B?” only needs to hit one shard - the following shard for user A.

The shard count is fixed at 256 for both tables independently. With 500 million users, each shard holds around 2 million user rows. Each row is at most 80KB (10,000 following IDs at 8 bytes each), so each shard holds at most 160GB of following data - easily fits on a single machine with replicas. In practice most users follow far fewer than 10,000 accounts, so average row size is closer to 400 bytes, making each shard roughly 800MB.

For writes, a single follow(A, B) event triggers writes to two different shards: the following shard for A (shard number A % 256) and the follower shard for B (shard number B % 256). These are separate atomic writes, not a distributed transaction. The system tolerates the window where one write completes and the other has not yet - the CDC-based reconciliation job detects and repairs inconsistencies within 30 seconds.

Celebrity problem: A user with 200 million followers has a follower row that would be 1.6GB - this breaks the row-per-user model. We detect celebrities as users with follower_count > 1,000,000 and route them to the celebrity_followers_page table, which paginates their follower list in chunks of 100,000. Follow-check for “does A follow celebrity B?” still goes through the following shard for A, not the follower shard for B, so it is unaffected by B’s celebrity status.

Watch Out

Using consistent hashing for social graph sharding seems appealing because it enables easy rebalancing, but it creates a severe celebrity hotspot problem: all queries about a celebrity with 100M followers land on the same virtual node. Range sharding with explicit celebrity overflow handling is more complex but avoids this failure mode.

Read-Optimized Caching

The sub-1ms follow-check SLA is impossible to hit from storage alone - even an NVMe SSD read takes 100 microseconds minimum. The answer must come from RAM, either in-process or from a nearby Redis instance.

Three-layer cache architecture showing bloom filter, Redis cluster, and persistent shards with write-through vs write-back strategies

Layer 0: Bloom filter (in-process, per service instance). A bloom filter for 500 million users with 0.1% false positive rate requires about 10MB per instance. The filter answers “not following” definitively in a single bit-array lookup - roughly 0.01ms. When the bloom filter says “no,” the function returns immediately without touching Redis or the shards. When it says “maybe,” we continue to L1. The bloom filter is hydrated from the following adjacency table at startup and updated on every write event. False positives mean we do an unnecessary L1 lookup, but the 0.1% rate means only 1 in 1000 “not following” queries do extra work.

Layer 1: Redis cluster (shared across service instances). For users not caught by the bloom filter, we check SISMEMBER following:{follower_id} {followee_id}. A Redis SISMEMBER on a hash set is O(1) and takes 0.1-0.3ms including network round trip to a co-located Redis replica. The Redis key following:{uid} holds a set of all user IDs that the given user follows. For users following 5,000 people, this set is 5,000 8-byte integers = 40KB. At a 70% cache hit rate, 70% of requests never touch the storage layer.

Layer 2: Following shard (persistent storage). Cache misses go to the following shard for the given follower_id. The shard returns the full following_ids sorted array, and the service performs a binary search to find followee_id. On cache miss, the service backfills the Redis set with the full following list and sets a TTL of 3600 seconds.

The mutual friends query uses SINTER following:{uid_a} following:{uid_b} in Redis, which returns the set intersection. For celebrity mutual-friend queries where the sets are too large, we fall back to a precomputed mutual:{uid_a}:{uid_b} counter stored as a Redis string, updated asynchronously by the graph traversal job.

# follow_check.py - Core follow-check logic with three-layer cache
import redis
from pybloom_live import BloomFilter
from bisect import bisect_left

# Module-level singletons (initialized once per service instance)
_bloom: BloomFilter = None
_redis_client: redis.RedisCluster = None

def init(bloom_fpr: float = 0.001, capacity: int = 500_000_000):
    global _bloom, _redis_client
    _bloom = BloomFilter(capacity=capacity, error_rate=bloom_fpr)
    _redis_client = redis.RedisCluster(
        startup_nodes=[{"host": "redis-cluster", "port": 6379}],
        decode_responses=False,
        skip_full_coverage_check=True,
    )
    _hydrate_bloom_from_snapshot()

def does_follow(follower_id: int, followee_id: int) -> bool:
    """
    Three-layer follow check: bloom -> redis -> shard.
    Returns True if follower_id follows followee_id.
    """
    # L0: Bloom filter - definitive negative
    bloom_key = _make_bloom_key(follower_id, followee_id)
    if bloom_key not in _bloom:
        return False  # 0.01ms path

    # L1: Redis SISMEMBER - sub-0.5ms on hit
    redis_key = f"following:{follower_id}"
    result = _redis_client.sismember(redis_key, followee_id)
    if result is not None:
        return bool(result)  # 0.3ms path

    # L2: Shard lookup with backfill
    following_ids = _fetch_following_from_shard(follower_id)
    _redis_client.sadd(redis_key, *following_ids)
    _redis_client.expire(redis_key, 3600)
    # Binary search in sorted array - O(log 10000) = 14 comparisons max
    idx = bisect_left(following_ids, followee_id)
    return idx < len(following_ids) and following_ids[idx] == followee_id

def mutual_friends(uid_a: int, uid_b: int, limit: int = 50) -> list[int]:
    """
    Returns up to `limit` mutual connections between uid_a and uid_b.
    Uses Redis SINTER for hot users, falls back to shard for cold ones.
    """
    key_a = f"following:{uid_a}"
    key_b = f"following:{uid_b}"
    # Ensure both sets are cached
    for uid, key in [(uid_a, key_a), (uid_b, key_b)]:
        if not _redis_client.exists(key):
            ids = _fetch_following_from_shard(uid)
            if ids:
                _redis_client.sadd(key, *ids)
                _redis_client.expire(key, 3600)
    # SINTER returns intersection
    mutual = _redis_client.sinter(key_a, key_b)
    return [int(uid) for uid in list(mutual)[:limit]]

def _make_bloom_key(a: int, b: int) -> bytes:
    # Combine two 8-byte ints into a single 16-byte key for the bloom filter
    return a.to_bytes(8, "big") + b.to_bytes(8, "big")

def _fetch_following_from_shard(follower_id: int) -> list[int]:
    # Routes to the correct shard based on follower_id % 256
    shard = follower_id % 256
    conn = _get_shard_connection(shard)
    row = conn.execute(
        "SELECT following_ids FROM following_adjacency WHERE follower_id = %s",
        (follower_id,)
    ).fetchone()
    return row[0] if row else []
Key Insight

The bloom filter is not just a performance optimization - it is a correctness shortcut. Because follow lists are capped at 10,000 entries and bloom filters have no false negatives, a “not in bloom” answer is 100% correct and costs 0.01ms. This eliminates 60-70% of all follow-check queries before they touch any network call.

Write-Through vs Write-Back

Every follow event must update two persistent shards (follower and following) plus two Redis keys plus the bloom filter. How these updates are ordered determines consistency guarantees and failure behavior.

Write-through means the write is not acknowledged until both the persistent shard and the cache are updated. The write latency is the sum of both operations - typically 2-3ms for the shard write plus 0.5ms for the Redis update. The benefit is that after every acknowledged write, both cache and storage are consistent. A service restart does not cause stale reads.

Write-back (write-behind) means the write is acknowledged after updating only the cache (Redis), and the persistent shard update happens asynchronously. Write latency is 0.5ms. The risk is data loss: if the cache server crashes after acknowledging the write but before the async persistence completes, that follow relationship is lost.

We use a hybrid strategy. For the following table (the table used for follow-check), we use write-through: the write is not acknowledged until both Redis and the following shard are updated. This ensures that follow-check reads are always consistent with acknowledged writes. For the follower table (the table used for “who follows user X” - used for fanout, not for real-time follow-checks), we use write-back: the Redis key is updated synchronously, and the persistent shard update happens asynchronously via the Kafka consumer. This is acceptable because the follower list is not used for sub-1ms read paths - it is read in batch by the fanout service.

// write_service.go - Follow event processing with hybrid write strategy
package followwrite

import (
    "context"
    "database/sql"
    "fmt"
    "time"

    "github.com/redis/go-redis/v9"
    "github.com/segmentio/kafka-go"
)

type FollowWriteService struct {
    followingShard *sql.DB    // write-through target
    redisCluster   *redis.ClusterClient
    kafkaWriter    *kafka.Writer
}

// Follow processes a follow(A, B) event with write-through for following, write-back for follower
func (s *FollowWriteService) Follow(ctx context.Context, followerID, followeeID int64) error {
    // Step 1: Write to following shard (write-through - blocks until durable)
    err := s.updateFollowingShard(ctx, followerID, followeeID, true)
    if err != nil {
        return fmt.Errorf("following shard write failed: %w", err)
    }

    // Step 2: Update Redis following set (write-through - blocks until Redis acks)
    redisKey := fmt.Sprintf("following:%d", followerID)
    pipe := s.redisCluster.Pipeline()
    pipe.SAdd(ctx, redisKey, followeeID)
    // Keep TTL fresh; do not reset it if the key already exists with a longer TTL
    pipe.ExpireGT(ctx, redisKey, 3600*time.Second)
    _, err = pipe.Exec(ctx)
    if err != nil {
        // Redis failure: following shard is correct, Redis will be repaired by CDC consumer
        // Log for monitoring but do not fail the write - eventual consistency acceptable for cache
        logCacheSyncFailure(followerID, followeeID, err)
    }

    // Step 3: Publish to Kafka (triggers async follower shard update - write-back)
    event := FollowEvent{
        FollowerID:  followerID,
        FolloweeID:  followeeID,
        EventType:   "follow",
        Timestamp:   time.Now().UnixMilli(),
        Version:     time.Now().UnixNano(),
    }
    return s.kafkaWriter.WriteMessages(ctx, kafka.Message{
        Key:   int64ToBytes(followeeID), // partition by followee for ordered follower updates
        Value: mustMarshal(event),
    })
}

func (s *FollowWriteService) updateFollowingShard(ctx context.Context, followerID, followeeID int64, add bool) error {
    // Atomic update using array manipulation - sorted insert or remove
    var query string
    if add {
        // Insert into sorted array position using intarray extension
        query = `
            UPDATE following_adjacency
            SET following_ids = sort(following_ids || ARRAY[$2]::bigint[]),
                following_count = following_count + 1,
                version = version + 1,
                updated_at = NOW()
            WHERE follower_id = $1
              AND NOT ($2 = ANY(following_ids))
        `
    } else {
        query = `
            UPDATE following_adjacency
            SET following_ids = array_remove(following_ids, $2::bigint),
                following_count = following_count - 1,
                version = version + 1,
                updated_at = NOW()
            WHERE follower_id = $1
        `
    }
    res, err := s.followingShard.ExecContext(ctx, query, followerID, followeeID)
    if err != nil {
        return err
    }
    rows, _ := res.RowsAffected()
    if rows == 0 && add {
        // Row does not exist yet - insert
        _, err = s.followingShard.ExecContext(ctx,
            `INSERT INTO following_adjacency (follower_id, following_ids, following_count)
             VALUES ($1, ARRAY[$2]::bigint[], 1)
             ON CONFLICT (follower_id) DO UPDATE
             SET following_ids = sort(following_adjacency.following_ids || EXCLUDED.following_ids),
                 following_count = following_adjacency.following_count + 1`,
            followerID, followeeID,
        )
    }
    return err
}
Watch Out

The most common mistake in write-back caching is forgetting that a Redis cluster failure causes both the cache update and the write-back queue to fail simultaneously - leaving the cache empty and the async queue drained with no pending repairs. Always keep the write-back queue in a durable store (Kafka or a WAL), not in Redis itself.

Eventual Consistency Tradeoffs

The dual-shard design introduces a window of inconsistency: after a follow event is written to the following shard but before the Kafka consumer has updated the follower shard, the two shards disagree about the state of that edge.

End-to-end request flow from client through bloom filter, Redis cache, and shard lookup showing timing at each layer

This inconsistency is deliberately tolerated because the queries that use the follower shard (fanout, “who follows me”) are not on the sub-1ms read path. They are batch operations that can tolerate seconds of staleness. The following shard, which is used for the follow-check hot path, is kept strongly consistent with write-through semantics.

The more dangerous consistency challenge is the read-your-writes requirement. When user A clicks “Follow” on user B’s profile, the button must immediately show “Following” regardless of which service instance handles the next page load. We achieve this by:

  1. The Write Service updates Redis following:{A} synchronously before acknowledging
  2. The client receives a follow_token in the response - a signed timestamp
  3. Subsequent reads from user A include the follow_token in the request header
  4. The Follow Check Service, if it sees a follow_token less than 5 seconds old, bypasses the bloom filter and goes directly to Redis (which is always up-to-date from step 1)
  5. After 5 seconds, the Kafka consumer has almost certainly updated all caches and the token is no longer needed

For the global eventual consistency window - the time between a write completing and all 256 follower shards plus all cache replicas reflecting it - we target 2 seconds. The Kafka consumer pipeline runs with 200ms end-to-end latency in normal operation. The 2-second target gives 10x headroom for consumer lag during write spikes.

# consistency_monitor.py - Monitors and repairs consistency windows
import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class InconsistencyEvent:
    follower_id: int
    followee_id: int
    following_shard_state: bool   # source of truth
    follower_shard_state: bool    # should match eventually
    detected_at: float

class ConsistencyReconciler:
    """
    Samples random edges and checks follower/following shard agreement.
    Triggers repair for edges that have been inconsistent for over 30 seconds.
    """
    def __init__(self, following_db, follower_db, repair_threshold_seconds: int = 30):
        self.following_db = following_db
        self.follower_db = follower_db
        self.threshold = repair_threshold_seconds

    def check_edge(self, follower_id: int, followee_id: int) -> Optional[InconsistencyEvent]:
        # Check following shard (source of truth)
        following_ids = self._get_following(follower_id)
        in_following = followee_id in set(following_ids)

        # Check follower shard (eventually consistent)
        follower_ids = self._get_followers(followee_id, limit=10_000)
        in_follower = follower_id in set(follower_ids)

        if in_following != in_follower:
            return InconsistencyEvent(
                follower_id=follower_id,
                followee_id=followee_id,
                following_shard_state=in_following,
                follower_shard_state=in_follower,
                detected_at=time.time(),
            )
        return None

    def repair(self, event: InconsistencyEvent) -> None:
        """Apply the following shard state to the follower shard."""
        if event.following_shard_state and not event.follower_shard_state:
            # Add missing follower entry
            self.follower_db.execute(
                "UPDATE follower_adjacency SET follower_ids = sort(follower_ids || ARRAY[%s]::bigint[]) WHERE followee_id = %s",
                (event.follower_id, event.followee_id)
            )
        elif not event.following_shard_state and event.follower_shard_state:
            # Remove stale follower entry
            self.follower_db.execute(
                "UPDATE follower_adjacency SET follower_ids = array_remove(follower_ids, %s::bigint) WHERE followee_id = %s",
                (event.follower_id, event.followee_id)
            )
Key Insight

Eventual consistency between the follower and following shards is safe because no product feature needs both to be simultaneously consistent in a single query - follow-check reads only the following shard, and fanout reads only the follower shard. The inconsistency window never exposes an incorrect answer to any user-facing query.

Graph Traversal Depth Limits

Mutual friend queries and “People You May Know” recommendations require graph traversal - walking edges outward from a starting node. The danger is unbounded fan-out: at 500 million users with an average of 500 connections, a depth-2 BFS touches 500 x 500 = 250,000 nodes, and a depth-3 BFS touches 125 million nodes. At 580,000 queries per second, this is compute-suicide.

Graph traversal depth limits showing BFS fan-out capping at depth 2 with 500-node limit per level

We enforce three hard limits at the traversal engine level:

Maximum depth = 2. No product feature we build requires traversal beyond second-degree connections. “Friends of friends” is depth 2. Anything deeper is a batch offline job, not a real-time query.

Fan-out limit per node = 500. When expanding a node’s neighbors, we take at most 500 neighbors. For users following fewer than 500 people, we take all of them. For users following more (up to 10,000), we take the 500 most recently followed. This caps the breadth of any BFS at 500 x 500 = 250,000 node visits per query - feasible in under 50ms when served from Redis.

Visited set deduplication. A BFS without deduplication revisits the same popular nodes exponentially. A visited set prevents this. We use a bitset indexed by user ID (500 million bits = 62MB) allocated per-request and pooled to avoid GC pressure.

// graph_traversal.go - Bounded BFS with depth and fan-out limits
package graphtraversal

import (
    "context"
    "sort"
)

const (
    MaxDepth    = 2
    MaxFanOut   = 500
    MaxResults  = 1000
)

type TraversalResult struct {
    NodeID      int64
    Depth       int
    MutualCount int
}

// MutualFriendCandidates returns depth-2 neighbors with mutual connection counts.
// Used for "People You May Know" offline batch jobs.
func MutualFriendCandidates(
    ctx context.Context,
    sourceUID int64,
    followStore FollowStore,
) ([]TraversalResult, error) {
    // Level 0: seed
    visited := newBitSet(500_000_000)
    visited.Set(sourceUID)

    // Level 1: direct following list (capped at MaxFanOut)
    depth1, err := followStore.GetFollowingCapped(ctx, sourceUID, MaxFanOut)
    if err != nil {
        return nil, err
    }
    for _, uid := range depth1 {
        visited.Set(uid)
    }

    // Level 2: friends of friends
    // Count mutual connections as we go
    mutualCount := make(map[int64]int, len(depth1)*10)
    for _, d1uid := range depth1 {
        d2nodes, err := followStore.GetFollowingCapped(ctx, d1uid, MaxFanOut)
        if err != nil {
            continue // partial results acceptable
        }
        for _, d2uid := range d2nodes {
            if visited.IsSet(d2uid) {
                continue // already seen at depth 1 or is source
            }
            mutualCount[d2uid]++
        }
    }

    // Build results, sorted by mutual count descending
    results := make([]TraversalResult, 0, len(mutualCount))
    for uid, count := range mutualCount {
        results = append(results, TraversalResult{
            NodeID:      uid,
            Depth:       2,
            MutualCount: count,
        })
    }
    sort.Slice(results, func(i, j int) bool {
        return results[i].MutualCount > results[j].MutualCount
    })
    if len(results) > MaxResults {
        results = results[:MaxResults]
    }
    return results, nil
}

type bitSet struct {
    bits []uint64
}

func newBitSet(capacity int64) *bitSet {
    return &bitSet{bits: make([]uint64, (capacity+63)/64)}
}
func (b *bitSet) Set(id int64)       { b.bits[id/64] |= 1 << uint(id%64) }
func (b *bitSet) IsSet(id int64) bool { return b.bits[id/64]&(1<<uint(id%64)) != 0 }
Real World

LinkedIn’s “People You May Know” feature uses a two-hop BFS over their social graph with similar fan-out caps. They published that naively traversing to depth 3 on their 900-million-node graph would require visiting the entire graph, which is why they switched to offline daily batch jobs for any traversal beyond depth 2 - real-time depth-3 traversal at LinkedIn scale is simply not feasible.

Data Model

The full data model includes the adjacency tables, cache metadata, and the event stream schema.

-- Complete schema for social graph service
-- Partitioned by shard_id for horizontal scaling

-- Following adjacency: source of truth for follow-check
CREATE TABLE following_adjacency (
  follower_id     BIGINT NOT NULL,
  following_ids   BIGINT[] NOT NULL DEFAULT '{}',
  following_count INT NOT NULL DEFAULT 0 CHECK (following_count >= 0 AND following_count <= 10000),
  updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  version         BIGINT NOT NULL DEFAULT 0,
  CONSTRAINT pk_following PRIMARY KEY (follower_id)
) PARTITION BY HASH (follower_id);

-- Create 256 partitions (deployed on 16-32 physical hosts)
CREATE TABLE following_adjacency_p0
  PARTITION OF following_adjacency
  FOR VALUES WITH (MODULUS 256, REMAINDER 0);
-- ... repeat for p1 through p255

-- Follower adjacency: used for fanout, "who follows me?"
CREATE TABLE follower_adjacency (
  followee_id    BIGINT NOT NULL,
  follower_ids   BIGINT[] NOT NULL DEFAULT '{}',
  follower_count INT NOT NULL DEFAULT 0,
  updated_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  version        BIGINT NOT NULL DEFAULT 0,
  CONSTRAINT pk_follower PRIMARY KEY (followee_id)
) PARTITION BY HASH (followee_id);

-- Celebrity overflow pages (followee_id with 1M+ followers)
CREATE TABLE celebrity_followers_page (
  followee_id   BIGINT NOT NULL,
  page_num      INT NOT NULL,
  follower_ids  BIGINT[] NOT NULL,
  min_follower  BIGINT NOT NULL,
  max_follower  BIGINT NOT NULL,
  page_size     INT NOT NULL GENERATED ALWAYS AS (cardinality(follower_ids)) STORED,
  created_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  CONSTRAINT pk_celeb_page PRIMARY KEY (followee_id, page_num)
);

-- Follow event log (Kafka topic schema as Postgres for reference)
CREATE TABLE follow_events (
  event_id      UUID NOT NULL DEFAULT gen_random_uuid(),
  follower_id   BIGINT NOT NULL,
  followee_id   BIGINT NOT NULL,
  event_type    TEXT NOT NULL CHECK (event_type IN ('follow', 'unfollow')),
  occurred_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  processed_at  TIMESTAMPTZ,
  shard_written SMALLINT,
  CONSTRAINT pk_follow_event PRIMARY KEY (event_id)
);
CREATE INDEX idx_events_by_followee ON follow_events (followee_id, occurred_at DESC);
CREATE INDEX idx_events_unprocessed ON follow_events (processed_at) WHERE processed_at IS NULL;

-- Mutual count cache (updated async, used to skip SINTER on hot pairs)
CREATE TABLE mutual_count_cache (
  uid_a         BIGINT NOT NULL,
  uid_b         BIGINT NOT NULL,
  mutual_count  INT NOT NULL DEFAULT 0,
  computed_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  CONSTRAINT pk_mutual PRIMARY KEY (LEAST(uid_a, uid_b), GREATEST(uid_a, uid_b))
);

The indexing strategy is deliberately minimal. The primary key on follower_id (or followee_id) is the only index needed per adjacency table - all queries use exact primary key lookups. The following_ids array is sorted within each row, enabling binary search at the application level. We do not index into the array contents because Postgres GIN indexes on large integer arrays are prohibitively expensive to maintain at 60,000 writes per second.

Key Algorithms and Protocols

Binary Search in Sorted Adjacency Array

The core follow-check operation on a cache miss is a binary search into the sorted following_ids array.

# binary_search_follow.py - Follow check via binary search in sorted array
from bisect import bisect_left

def is_following(following_ids: list[int], target_id: int) -> bool:
    """
    O(log n) follow check where n <= 10,000 (the max following limit).
    At n=10,000, this is at most 14 comparisons.
    
    following_ids must be sorted in ascending order.
    Returns True if target_id is in following_ids.
    """
    if not following_ids:
        return False
    idx = bisect_left(following_ids, target_id)
    return idx < len(following_ids) and following_ids[idx] == target_id

def insert_following(following_ids: list[int], new_id: int) -> list[int]:
    """
    Maintain sorted order on follow. O(n) due to array shift, but n <= 10,000.
    In practice, this runs in Postgres with array || operator + sort().
    """
    idx = bisect_left(following_ids, new_id)
    if idx < len(following_ids) and following_ids[idx] == new_id:
        return following_ids  # already following, idempotent
    return following_ids[:idx] + [new_id] + following_ids[idx:]

def remove_following(following_ids: list[int], remove_id: int) -> list[int]:
    """
    Remove from sorted array. O(n) due to shift. Uses bisect for O(log n) find.
    """
    idx = bisect_left(following_ids, remove_id)
    if idx >= len(following_ids) or following_ids[idx] != remove_id:
        return following_ids  # not following, idempotent
    return following_ids[:idx] + following_ids[idx+1:]

Time complexity: O(log n) for check, O(n) for insert/delete, where n is bounded by 10,000. Space complexity: O(1) for check, O(n) for the stored array. The bounded n makes the O(n) insert/delete acceptable - it is at most 80KB of data being shifted.

Bloom Filter Maintenance

The bloom filter must be updated on every follow and unfollow event. Bloom filters are append-only - you cannot remove elements. This means an unfollow event cannot remove a key from the bloom filter, which would cause false positives for unfollowed relationships. We handle this by using a counting bloom filter variant that supports deletions, or by periodically rebuilding the standard bloom filter from a snapshot of the current state.

# bloom_maintenance.py - Bloom filter rebuild and update protocol
from pybloom_live import BloomFilter
import time

class ManagedBloomFilter:
    """
    Wraps a bloom filter with a rebuild-on-schedule strategy for handling deletions.
    Unfollows cannot remove entries, so we rebuild every 6 hours from a snapshot.
    During rebuild, a shadow filter accumulates new entries; both are checked in parallel.
    """
    def __init__(self, capacity: int = 500_000_000, error_rate: float = 0.001):
        self.capacity = capacity
        self.error_rate = error_rate
        self.active = BloomFilter(capacity=capacity, error_rate=error_rate)
        self.shadow = None
        self.rebuild_start = None
        self.rebuild_interval_seconds = 6 * 3600

    def contains(self, follower_id: int, followee_id: int) -> bool:
        key = follower_id.to_bytes(8, 'big') + followee_id.to_bytes(8, 'big')
        if self.shadow is not None:
            # During rebuild: check both active and shadow
            return key in self.active or key in self.shadow
        return key in self.active

    def add(self, follower_id: int, followee_id: int) -> None:
        key = follower_id.to_bytes(8, 'big') + followee_id.to_bytes(8, 'big')
        self.active.add(key)
        if self.shadow is not None:
            self.shadow.add(key)

    def start_rebuild(self) -> BloomFilter:
        """Swap shadow in. The rebuild job writes into shadow while active serves reads."""
        self.shadow = BloomFilter(capacity=self.capacity, error_rate=self.error_rate)
        self.rebuild_start = time.time()
        return self.shadow

    def complete_rebuild(self) -> None:
        """Atomic swap: shadow becomes active, active is discarded."""
        if self.shadow is not None:
            self.active = self.shadow
            self.shadow = None
            self.rebuild_start = None
Key Insight

Bloom filters support additions but not removals, which means an unfollow event leaves a false positive in the filter forever - causing unnecessary Redis lookups for unfollowed pairs. The periodic rebuild from a snapshot cleans these up, and the 0.1% false positive rate means the overhead is minimal between rebuilds.

Scaling and Performance

Capacity Estimation

Given:
  - 500M users
  - Average following count: 400 (median far lower, power law distribution)
  - Average follower count: 400
  - 50B follow-check queries per day = 580K QPS peak (2x average)
  - 5B follow/unfollow events per day = 58K writes/second peak

Storage:
  - Following adjacency: 500M users * 400 avg following * 8 bytes = 1.6TB raw
  - Follower adjacency: same = 1.6TB raw
  - Celebrity overflow (1M users with 1M+ followers): ~800GB
  - Redis following sets: 500M users * 400 * 8B = 1.6TB total; cache 20% hot users = 320GB Redis
  - Total persistent storage: ~4TB raw, ~12TB with 3x replication

Bandwidth:
  - Follow-check read: 580K QPS * ~200 bytes/response = 116 MB/s inbound
  - Follow write: 58K QPS * ~100 bytes/event = 5.8 MB/s inbound
  - Kafka replication: 5.8 MB/s * 3 replicas = 17.4 MB/s
  - Total outbound from storage tier: ~150 MB/s

Compute:
  - Follow Check Service: 580K QPS at 0.5ms avg CPU = 290 CPU cores
    (assuming 2K QPS per core with Redis serving 70% from cache)
  - Follow Write Service: 58K QPS at 2ms CPU = 116 CPU cores
  - Redis: 320GB data, 20 Redis nodes at 16GB each with 3x replication = 20 nodes
  - Storage shards: 256 partitions across 32 physical hosts (8 partitions/host)
  - Total estimated: ~50 EC2 r6i.2xlarge instances for services,
    20 Redis nodes, 32 storage nodes

Horizontal Scaling

The Follow Check Service scales horizontally with zero coordination. Each instance maintains its own bloom filter (hydrated from a shared snapshot in S3 on startup, updated via Kafka consume). Adding new instances does not require redistributing state. Auto-scaling triggers at 70% CPU utilization.

The storage shards scale vertically first (more RAM to keep the working set in memory) and then horizontally by splitting shards. Splitting a shard from 256 to 512 partitions is a background operation: the data is copied to two new shards, reads are served from both during migration, and the old shard is decommissioned after the copy is verified.

Redis scales by adding nodes to the cluster. Rebalancing is automatic via Redis Cluster MIGRATE. The main scaling constraint on Redis is memory: at 320GB for the hot following sets, we need 20 Redis nodes with 16GB each plus 3x replication overhead.

Real World

Instagram’s social graph team published in 2022 that they serve follow-check queries at under 500 microseconds p99 using a nearly identical architecture: in-process bloom filters, Redis SISMEMBER, and PostgreSQL arrays sharded on user ID. The key operational insight they shared: the bloom filter alone eliminates 65% of Redis calls, making the Redis fleet 3x smaller than it would otherwise need to be.

Failure Modes and Recovery

FailureDetectionImpactRecovery
Redis node failureRedis Cluster gossip protocol within 1s; health check missesFollow-check latency rises from 0.3ms to 2ms (L2 shard hits); cache misses surgeRedis Cluster promotes replica automatically within 15s; cache warms from shard reads over 5-10 minutes
Following shard host failurePostgreSQL streaming replication heartbeat timeout (5s)Follow-check queries to that shard return errors; 1/256 of users affectedPatroni promotes standby within 30s; bloom filter and Redis cache serve most queries during failover
Kafka broker failureKafka consumer group rebalance detection (3s)Write-back follower shard updates stall; follower counts become staleKafka controller elects new leader in 10s; consumer resumes from last committed offset; no events lost
CDC consumer lagConsumer offset lag metric crosses 100K events thresholdFollower shard and cache diverge from following shardScale consumer group to add parallelism; reconciler job detects and repairs inconsistencies
Bloom filter corruptionPeriodic hash verification of filter against shard dataFalse positives increase; excess Redis callsService instance isolates corrupted filter; trigger background rebuild from S3 snapshot
Clock skew on write hostsNTP monitoring; version number gaps in replication streamOut-of-order follow/unfollow events cause incorrect stateVersion-based conflict resolution; always take higher version number as authoritative state
Watch Out

The most dangerous operational mistake is warming a new Redis cluster from the storage shards during a traffic spike. Scanning 500 million rows in the following adjacency table generates millions of sequential reads on the storage tier while simultaneously handling peak traffic - this will saturate your storage I/O and cause a cascade failure. Always warm from a pre-built snapshot in object storage, not from live shards.

Comparison of Approaches

ApproachFollow-Check LatencyMutual FriendsWrite ComplexityFailure ModeBest Fit
Single SQL table + index5-50ms (cold)Multi-table join, very slowSimple INSERTIndex bloat at 500B rows causes OOMUnder 10M users
Graph database (Neo4j)1-5msNative traversalSimpleCelebrity node becomes hot shard; no horizontal scaleRich traversals, under 50M users
Adjacency list + Redis (this design)0.3ms (Redis hit)SINTER O(n)Dual-write, complexRedis memory limit; shard split complexity50M-1B users, latency-critical
Bitmap per user0.01ms (bitset check)Bitset AND O(1)Huge storage (500M * 500M bits)31TB per user - infeasibleTheoretical; never practical
Adjacency list + bloom + local cache0.01ms typicalRequires Redis fallbackSame as above + filter rebuildBloom false positives on unfollowSame as above; preferred at 500M+
Distributed graph (Giraph, GraphX)50-500msFull traversalComplex job coordinationBatch only; no real-timeOffline analytics, not serve path

The adjacency list with bloom filter and Redis is the right choice for this system. The bitmap approach would give faster lookups but requires 31TB of RAM per user to store a full 500M-bit vector - that is not a real option. Graph databases like Neo4j provide richer traversal APIs but cannot handle the celebrity hotspot problem at scale. The SQL single-table approach works at small scale but the index becomes the bottleneck above 100 million users. For real-time, sub-1ms follow-check at 500 million users, the adjacency list sharded by user ID with a Redis bloom filter serving most of the traffic is the only practical design.

Key Takeaways

  • Dual-shard storage (follower-keyed and following-keyed) eliminates cross-shard joins by trading storage and write amplification for single-shard reads on every query.
  • Bloom filters provide definitive negatives in 0.01ms, eliminating 60-70% of all follow-check queries before touching any network, making the Redis fleet dramatically smaller.
  • Write-through for the following shard ensures follow-check reads are always consistent with acknowledged writes; write-back for the follower shard allows fast write acknowledgment for the non-critical fanout path.
  • Bounded fan-out at depth 2 makes graph traversal safe to run in real-time; any deeper traversal must be an offline batch job to avoid compute explosions on the social graph.
  • Sorted adjacency arrays cap follow-check to O(log 10000) = 14 comparisons, making the storage-layer fallback predictably fast regardless of graph size.
  • Celebrity overflow routing detects users with 1M+ followers and routes them to a paginated store, preventing single-row hot spots that would OOM a storage host.
  • Read-your-writes via follow_token ensures the user who clicked “Follow” sees consistent state without requiring synchronous global consistency across all replicas.
  • Periodic bloom filter rebuild from snapshots handles the deletion problem - unfollows cannot remove bloom entries, so rebuilding every 6 hours caps false positive accumulation.

The counter-intuitive lesson from this design: the hardest part is not the graph storage itself, but the cache invalidation protocol. Every engineer understands “store edges in a table.” Far fewer understand that at 500 million users, you need three layers of caching, and the interaction between those layers during a write creates more failure modes than the storage layer ever will.

Frequently Asked Questions

Q: Why not use a purpose-built graph database like Neo4j or Amazon Neptune? A: Graph databases excel at multi-hop traversal queries where the pattern of edges matters (e.g., finding shortest paths, subgraph matching). For follow-check, we are doing a single-hop point lookup - “does edge (A, B) exist?” - which is exactly what a hash lookup or binary search in a sorted array does optimally. Graph databases introduce the celebrity hotspot problem (a node with 200M edges saturates a single server) and lack the horizontal scalability of sharded key-value stores. At 500 billion edges, the overhead of graph database pointers and traversal state is a cost we cannot afford for a point-lookup problem.

Q: Why not use a simple Redis HSET with user IDs as keys and following sets as values? A: This is actually close to what we do for the cache layer. The problem with using Redis as the primary store (not cache) is durability and memory cost. 500 million users with average 400 following IDs each is 1.6TB of data. Redis requires all data in RAM, which costs around $80,000/month in cloud RAM. Our persistent shards (PostgreSQL arrays on NVMe) cost 10x less and survive instance restarts. Redis serves the hot 20% that drives 80% of traffic; cold users hit the shard on cache miss.

Q: Why not store follow relationships in a column-family store like Cassandra? A: Cassandra with a (follower_id, followee_id) composite key is a viable alternative. The follow-check query becomes a point lookup WHERE follower_id = A AND followee_id = B, which Cassandra handles well. The reason we prefer PostgreSQL arrays is that a single Cassandra partition for a user with 10,000 following entries is 10,000 rows - each row is a separate cell with overhead. The PostgreSQL array stores 10,000 IDs as a single 80KB row, which is 2-3x more storage-efficient and allows a single I/O to retrieve the full following list for bloom filter warm-up.

Q: How do you handle the case where a celebrity unfollows someone with 200 million followers? A: Unfollow for the celebrity’s own following list (the following shard, keyed by the celebrity’s user ID) is cheap - they follow at most 10,000 people, and the array operation is fast. The expensive part is removing the celebrity from the target user’s follower list - if the target also has millions of followers, their follower shard row is huge and might be in celebrity overflow. In this case, the unfollow triggers a lookup into celebrity_followers_page to find and remove the celebrity’s ID, which is paginated and can be processed asynchronously. The follow-check for this pair is updated immediately via the following shard; the follower shard update is eventually consistent within 2 seconds.

Q: What happens when the Kafka consumer falls behind and the follower shard is hours out of date? A: The Kafka topic retains 7 days of events, so a consumer that falls behind can always catch up by replaying from its last committed offset. During the lag window, the follower shard is stale - follower counts are wrong, fanout misses some recipients. Neither of these affects the sub-1ms follow-check SLA because follow-check uses the following shard, not the follower shard. We alert on consumer lag crossing 100K events and have runbooks for scaling the consumer group by adding partitions to the Kafka topic and consumer instances.

Q: Why is the following count capped at 10,000 and not higher? A: The 10,000 cap exists for two reasons. First, it bounds the row size in the following adjacency table to 80KB, ensuring the full row fits in a single 8KB Postgres page with TOAST compression, which is critical for cache performance. Second, and more importantly, it prevents the binary search from becoming too slow - at 10,000 entries, binary search is 14 comparisons. At 1 million entries, it would be 20 comparisons but the row size would be 8MB, blowing the page cache. Twitter, Instagram, and similar platforms all impose following limits for exactly this reason.

Interview Questions

Q: How would you design the follow-check service to achieve sub-1ms p99 latency for 580,000 queries per second? Expected depth: Discuss the three-layer cache hierarchy (bloom filter, Redis SISMEMBER, sorted adjacency array). Explain why each layer exists and what latency it serves. Quantify the cache hit rates needed: at 70% Redis hit rate with 0.3ms Redis latency and 2ms shard latency, the weighted average is 0.7 * 0.3 + 0.3 * 2 = 0.81ms, which hits the SLA. Discuss bloom filter false positive rate and the trade-off between filter size and false positive rate.

Q: How do you handle the celebrity problem - a user with 200 million followers - without creating a hot shard? Expected depth: Explain that the follow-check query routes to the celebrity’s following shard (keyed by the celebrity’s user ID, not their followers), so reading “does celebrity follow user X?” is always fast. The hot shard problem occurs when reading “who follows the celebrity?” for fanout purposes. Discuss the celebrity overflow table with paginated storage, the fan-out service reading in batches, and async fanout for celebrity posts vs. sync fanout for regular users. Mention the hybrid fanout (push for regular users, pull for celebrity followers) used by Twitter and Instagram.

Q: The system has two shards for every follow relationship - the following shard and the follower shard. How do you ensure they stay consistent, and what is the impact of inconsistency? Expected depth: Explain that the following shard is write-through (source of truth) and the follower shard is eventually consistent via Kafka CDC. Analyze the impact: an inconsistent follower shard means a fanout service might miss a follower for a short window (they don’t get a notification), which is acceptable. An inconsistent following shard would cause incorrect follow-check results, which is not acceptable - this is why the following shard is synchronously written. Discuss the reconciler that detects and repairs divergence within 30 seconds.

Q: How would you implement “People You May Know” using this social graph service without overwhelming the storage layer? Expected depth: Describe the depth-2 BFS with the 500-node fan-out cap. Explain that this generates at most 250,000 node visits, which at 580K QPS would be 145 billion node visits per second if done in real-time - clearly impossible. The solution is to run PYMK as an offline batch job (Spark, nightly) that pre-computes candidate lists and stores them in a separate recommendation table. Real-time “People You May Know” on profile view uses a 15-minute TTL cache of the precomputed result. Discuss the mutual friend scoring (higher mutual count = better candidate).

Q: How do you maintain the bloom filter across 200+ service instances, each with their own in-process filter? Expected depth: Discuss the initial hydration from an S3 snapshot (built nightly by a batch job). Discuss real-time updates via Kafka: each service instance subscribes to the follow-events topic and updates its local bloom filter on every event. Since bloom filters are write-once (can add but not remove), unfollow events are ignored in real-time and handled by the nightly snapshot rebuild. Discuss the shadow filter pattern for zero-downtime rebuilds. Mention that bloom filter state is eventually consistent across instances (each instance applies events at slightly different times), which is acceptable because it only affects whether a Redis call is made - not correctness of the final answer.

Continue Learning

Want to see how these patterns hold up when traffic spikes 50x at 3 AM? That's exactly what this Premium deep-dive covers.