Build Amazon's Review Aggregation Pipeline


data-engineering scalability performance

System Design Deep Dive

Review Aggregation Pipeline

Deduplicating millions of submissions, filtering fraud at signal level, and surfacing trustworthy rankings in real time.

⏱ 14 min read📐 Advanced🏗️ Fraud Detection

Think of Amazon’s review system as a municipal water treatment plant. Raw water - reviews - flows in from thousands of sources simultaneously. Some of it is perfectly clean. Some carries sediment you can filter mechanically - duplicate submissions that arrive twice because a mobile app retried on a flaky connection. Some carries chemical contaminants - fake reviews planted by sellers or review farms - that require a completely different detection mechanism. And the plant must deliver clean, drinkable water to 300 million customers in real time, not hours later. The treatment pipeline does not operate on water in batch mode once per day. It processes continuously, taps multiple filtration stages in sequence, and maintains rolling quality measurements across every product in a catalogue of 400 million items.

The numbers make this hard in very specific ways. Amazon processes roughly 2.5 million new reviews per day across its global marketplace - that is about 30 submissions per second at average load, spiking to over 500 per second during Prime Day or holiday shopping events. Each review carries not just text and a star rating but a behavioral context: when the user purchased the item, how many reviews that account has written in the last 30 days, whether the purchase was verified, and dozens of other signals. Storing that raw event is the easy part. The hard part is doing six things simultaneously: detecting duplicates before they inflate counts, scoring fraud risk within the same request window, updating a product’s rolling average atomically, re-ranking the “most helpful” list, keeping the search index fresh, and serving all of this to product detail pages that see 10,000 reads per second for a bestselling ASIN.

A naive approach - write review to Postgres, recompute the average via SELECT AVG(rating), and sort by helpful_votes DESC - breaks at scale for three distinct reasons. First, a full-table scan over 50 million reviews for a popular product just to recompute one average kills your database during every write. Second, duplicate detection via a UNIQUE constraint on (user_id, product_id) misses the case where a review farm submits 500 slightly different reviews from 500 different compromised accounts all rating the same ASIN. Third, a simple ORDER BY helpful_votes DESC ranking is trivially gameable: a single viral review from a week ago will always outrank a newer one that is objectively more useful. You need a confidence-adjusted score, not a raw vote count.

We need to solve for throughput isolation - writes must not block reads, and fraud scoring must not block writes - plus incrementally maintained aggregates instead of recomputed ones, plus a statistically robust ranking function, all simultaneously.

Requirements and Constraints

Functional Requirements

  • Accept review submissions (star rating 1-5, free text up to 5,000 chars, optional media) and return an acknowledgement within 200ms
  • Deduplicate submissions: reject exact duplicates within 90 days and near-duplicates from the same account on the same product within 24 hours
  • Score each review for fraud using behavioral signals; reject high-confidence fraud, queue borderline cases for human review
  • Maintain per-product rolling average star rating that updates within 5 seconds of a new verified review landing
  • Rank visible reviews per product by “helpfulness,” defined as a Wilson score lower confidence bound on the helpful-vote ratio
  • Surface the top-50 reviews per product on the product detail page (sorted by helpfulness, then recency)
  • Allow full-text search of reviews per product with faceting by star rating, verified purchase, and recency
  • Support soft-deletion of reviews that are later flagged by moderators, with all aggregates updated accordingly

Non-Functional Requirements

  • Write throughput: 500 submissions/second sustained, 2,000/second burst (10x Prime Day spike)
  • Read throughput: 50,000 requests/second for product review pages (read-heavy ratio 100:1)
  • Review aggregate latency: average star rating visible to 99% of users within 5 seconds of a new review passing fraud check
  • API p99 write latency: under 250ms (synchronous dedup check + Kafka publish)
  • API p99 read latency: under 50ms for cached product review summaries
  • Fraud detection latency: under 500ms per review in the async scoring pipeline
  • Durability: zero review data loss; all raw submissions retained in S3 for 7 years (regulatory)
  • Availability: 99.99% uptime on read path; writes can degrade gracefully to async-only mode
  • Storage: approximately 5 TB for 3 years of review text + metadata; 200 GB for Redis working set

Constraints and Assumptions

  • Reviews are not editable after submission (simplifies aggregate maintenance)
  • Helpful votes are a separate event stream, not part of the review submission pipeline
  • We do not build the ML training pipeline here - the fraud model is pre-trained and served via a feature store
  • Geographic distribution (per-country review sets) is out of scope; we design for a single region
  • Comment/reply threads on reviews are out of scope

High-Level Architecture

The system separates into five major components with distinct scaling characteristics. Understanding which component dominates which bottleneck is the key to sizing the system correctly.

Amazon Review Aggregation Pipeline - High-Level Architecture

The Review API Gateway handles inbound submissions, enforces rate limits (10 reviews/user/day), authenticates the request via the customer identity service, and performs the synchronous deduplication check. It returns a 202 Accepted to the customer immediately after publishing to Kafka - the rest of the pipeline is asynchronous.

The Dedup Service runs inside the API gateway request path. It computes a fingerprint from (asin, user_id, normalized_text) and checks Redis for that fingerprint using SETNX. If the key already exists, the submission is rejected with a 409 Conflict. This is the only part of the pipeline that blocks the customer’s HTTP request.

The Fraud Detection Service consumes the review-submissions Kafka topic, extracts behavioral signals, scores via a pre-loaded XGBoost model, and routes: clean reviews pass to the rating aggregator, high-confidence fraud is dropped with a moderation record, and borderline cases go to a human review queue backed by DynamoDB.

The Rating Aggregator maintains (weighted_sum, count) per ASIN using Redis HINCRBYFLOAT and HINCRBY, computing the live average as weighted_sum / count. It also writes the updated aggregate to PostgreSQL for durability. This component handles the hot-path throughput problem by avoiding any read-modify-write cycle.

The Review Index is an Elasticsearch cluster that maintains a document per review with analyzed text, structured facets, and a helpfulness_score field that drives the sort. Elasticsearch’s update_by_query is too slow for real-time updates; instead, each vote event triggers a targeted _update on the specific review document.

Key Insight

The entire write path never blocks on a database read. Dedup uses Redis SETNX (a pure write), fraud scoring is async, and rating aggregation uses HINCRBYFLOAT (incremental, no read needed). This is why the system can sustain 500 writes/second without read amplification.

The Deduplication Stage

The deduplication job is to answer a single question before the customer’s HTTP connection closes: “have we seen this review before?” Like a postal sorter checking return addresses against a bounce list, it must do this check in under 10 milliseconds for the write latency budget to work out.

Pipeline Stages - Component Internals

Deduplication fingerprinting works at two levels. The first level is exact deduplication: we compute SHA-256(asin + ":" + user_id + ":" + normalize(text)) where normalize strips punctuation, lowercases, and collapses whitespace. This 32-byte hash is stored in Redis as a key with a 90-day TTL using SET dedup:{hash} 1 EX 7776000 NX. If SET returns nil (key existed), we reject. If it returns OK, we proceed. The entire check is a single round-trip to Redis at under 1ms.

The second level is near-duplicate detection for the case where a review farm submits 100 slightly varied versions of the same review from different accounts. We use a per-user, per-product sliding window: SET dedup_user:{user_id}:{asin} 1 EX 86400 NX. If a user already submitted any review for this product in the last 24 hours, we reject the second one. This is separate from the 90-day exact fingerprint because the user might legitimately update their opinion - we rate-limit them to one review per day but allow them to write again after the 24-hour window clears.

# Deduplication logic - two-level fingerprint check
import hashlib
import re
import redis

def normalize_text(text: str) -> str:
    # Strip punctuation, lowercase, collapse whitespace
    text = text.lower()
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def check_dedup(r: redis.Redis, asin: str, user_id: str, text: str) -> tuple[bool, str]:
    """
    Returns (is_duplicate, reason).
    Uses two-level dedup: exact fingerprint + per-user-per-product window.
    """
    # Level 1: exact content fingerprint (90-day window)
    normalized = normalize_text(text)
    fp_input = f"{asin}:{user_id}:{normalized}"
    fingerprint = hashlib.sha256(fp_input.encode()).hexdigest()
    exact_key = f"dedup:{fingerprint}"

    # Level 2: per-user-per-product daily rate limit
    user_key = f"dedup_user:{user_id}:{asin}"

    pipe = r.pipeline(transaction=False)
    pipe.set(exact_key, "1", ex=7_776_000, nx=True)   # 90 days
    pipe.set(user_key,  "1", ex=86_400,    nx=True)   # 24 hours
    results = pipe.execute()

    # results[0] = None if exact dup, True if new
    # results[1] = None if user already reviewed today, True if new
    if results[0] is None:
        # Exact duplicate - we must clean up the user key we may have just set
        if results[1] is True:
            r.delete(user_key)
        return True, "exact_duplicate"

    if results[1] is None:
        # Same user, same product, within 24 hours
        # Roll back the fingerprint key we just set
        r.delete(exact_key)
        return True, "rate_limited"

    return False, ""

def submit_review(r: redis.Redis, kafka_producer, review: dict) -> dict:
    is_dup, reason = check_dedup(
        r, review["asin"], review["user_id"], review["text"]
    )
    if is_dup:
        return {"status": "rejected", "reason": reason}

    kafka_producer.produce(
        topic="review-submissions",
        key=review["asin"].encode(),
        value=review,
    )
    return {"status": "accepted"}

Notice that we pipeline both Redis SET operations in a single round-trip using pipeline(transaction=False). We do not need atomicity between the two keys - they serve different purposes and it is acceptable to roll back one if the other reveals a duplicate. The rollback adds one extra round-trip in the rejection case, which is rare relative to accepted submissions.

Watch Out

A race condition exists if two identical reviews arrive within the same millisecond from different API servers. Both servers run the pipeline simultaneously - both get True for the SET operations - and both accept the review. This is acceptable: the Kafka consumer downstream will encounter two messages with the same fingerprint. We handle this by making the Postgres insert idempotent using ON CONFLICT DO NOTHING on the fingerprint column. The probability of this race is extremely low under normal load, and the consequence is a harmless duplicate that the constraint catches.

The Fraud Detection Stage

Fraud detection is the component most engineers underestimate. The instinct is to write a few if-then rules: “reject reviews from accounts under 7 days old” or “flag if the user reviewed 10 products in 24 hours.” Rules like these are necessary but not sufficient. A sophisticated review farm will operate with aged accounts, post at moderate velocity, and vary their text just enough to evade exact dedup. What we need is a feature-rich signal-based approach that uses behavioral patterns, not just individual thresholds.

Fraud signal features are extracted per review event and combined into a feature vector that feeds a pre-trained gradient boosted tree model. The features fall into three categories.

Account-level features capture the long-term behavioral profile of the submitting user: account_age_days, lifetime_review_count, verified_purchase_ratio (what fraction of their reviews were verified purchases), avg_rating_given (chronic 1-star or 5-star givers are suspicious), and rating_variance (a farm may rate the same product 5 stars 50 times in a row with near-zero variance).

Session-level features capture burst behavior in short time windows: reviews_last_24h, reviews_last_7d, unique_asins_last_24h, avg_text_length_7d, and whether the submission arrived via a known datacenter IP range (often bot traffic).

Product-level features capture unusual review patterns on the target ASIN: review_velocity_last_hour (number of new reviews for this product in the last hour, compared to the 30-day average), new_account_ratio_last_24h (fraction of recent reviewers with accounts under 30 days old), and rating_spike_delta (how much the average would shift if this review were accepted).

# Fraud signal feature extraction
from dataclasses import dataclass
import redis
import time

@dataclass
class FraudFeatures:
    account_age_days: float
    lifetime_review_count: int
    verified_purchase_ratio: float
    avg_rating_given: float
    rating_variance: float
    reviews_last_24h: int
    reviews_last_7d: int
    unique_asins_24h: int
    avg_text_length_7d: float
    is_datacenter_ip: bool
    product_velocity_1h: float   # reviews/hour vs 30d baseline
    product_new_acct_ratio: float
    rating_spike_delta: float

def extract_features(
    r: redis.Redis,
    user_id: str,
    asin: str,
    rating: int,
    text: str,
    is_verified: bool,
    account_created_ts: int,
) -> FraudFeatures:
    now = int(time.time())
    account_age_days = (now - account_created_ts) / 86400.0

    # Pull pre-computed user signals from Redis hash (updated by background job)
    user_key = f"fraud:{user_id}"
    u = r.hgetall(user_key)
    lifetime = int(u.get(b"lifetime", 0))
    vp_count  = int(u.get(b"vp_count", 0))
    sum_rating = float(u.get(b"sum_rating", 0.0))
    sum_sq_rating = float(u.get(b"sum_sq_rating", 0.0))
    rev_24h   = int(u.get(b"rev_24h", 0))
    rev_7d    = int(u.get(b"rev_7d", 0))
    uniq_24h  = int(u.get(b"uniq_24h", 0))
    avg_len_7d = float(u.get(b"avg_len_7d", 200.0))

    vp_ratio = (vp_count / lifetime) if lifetime > 0 else 0.0
    avg_rating = (sum_rating / lifetime) if lifetime > 0 else 3.0
    # Variance = E[X^2] - (E[X])^2
    variance = (
        (sum_sq_rating / lifetime) - avg_rating ** 2
        if lifetime > 0 else 1.0
    )

    # Product-level signals from Redis sorted set
    prod_key = f"prod_velocity:{asin}"
    one_hour_ago = now - 3600
    velocity_1h = r.zcount(prod_key, one_hour_ago, now)

    # Baseline: use 30d average velocity stored as float
    baseline_key = f"prod_baseline:{asin}"
    baseline = float(r.get(baseline_key) or 1.0)
    velocity_ratio = velocity_1h / max(baseline, 0.1)

    # Rating spike: how much would avg change with this new rating?
    rating_cache_key = f"rating:{asin}"
    rc = r.hgetall(rating_cache_key)
    current_sum = float(rc.get(b"sum", 0))
    current_count = int(rc.get(b"count", 1))
    current_avg = current_sum / current_count
    new_avg = (current_sum + rating) / (current_count + 1)
    spike_delta = abs(new_avg - current_avg)

    # New-account ratio: check fraction of recent reviewers with age < 30d
    new_acct_key = f"prod_new_acct:{asin}"
    total_recent = r.zcount(new_acct_key, now - 86400, now)
    new_recent = float(r.get(f"prod_new_acct_count:{asin}") or 0)
    new_acct_ratio = (new_recent / total_recent) if total_recent > 0 else 0.0

    return FraudFeatures(
        account_age_days=account_age_days,
        lifetime_review_count=lifetime,
        verified_purchase_ratio=vp_ratio,
        avg_rating_given=avg_rating,
        rating_variance=variance,
        reviews_last_24h=rev_24h,
        reviews_last_7d=rev_7d,
        unique_asins_24h=uniq_24h,
        avg_text_length_7d=avg_len_7d,
        is_datacenter_ip=False,  # injected from request metadata
        product_velocity_1h=velocity_ratio,
        product_new_acct_ratio=new_acct_ratio,
        rating_spike_delta=spike_delta,
    )

The XGBoost model outputs a fraud probability between 0 and 1. We apply three routing thresholds: below 0.30 means the review passes cleanly; between 0.30 and 0.70 it goes to a human moderation queue; above 0.70 it is automatically rejected and a moderation record is written. The 0.70 threshold is tuned for high precision - we would rather let a marginal fake review through than incorrectly suppress a genuine customer’s voice.

Real World

Amazon’s actual fraud detection (called “community abuse detection”) uses a similar ML + rules hybrid. The key addition they use is graph-based features: if a user shares 5 or more verified purchases with another known bad actor, that co-purchase graph edge becomes a strong fraud signal. Facebook’s integrity team applies the same technique - coordinated inauthentic behavior is much easier to detect as a graph anomaly than as isolated events.

The Rating Aggregation Stage

Weighted average calculation is the heart of the system’s performance story. Imagine you are a bank teller keeping a running account balance. You do not re-count all the bills in the vault every time a customer deposits a dollar - you just add the deposit to the current balance. The Rating Aggregator applies this same principle: it maintains a (weighted_sum, count) pair per ASIN and updates it incrementally on every accepted review, never scanning historical records.

The star average displayed on Amazon is not a raw arithmetic mean. It uses a Bayesian-smoothed estimate that biases toward 3.0 when review counts are low (to prevent a product with one 5-star review from showing 5.0) and converges toward the true mean as reviews accumulate. The formula is:

bayesian_avg = (C * m + sum_ratings) / (C + count)

Where C is a prior weight (typically 10, representing 10 “phantom” reviews at the mean rating m = 3.0). This is mathematically equivalent to treating the prior as a Dirichlet distribution with concentration parameter C over the five star ratings.

-- Rating aggregation table - source of truth for product averages
-- PostgreSQL schema
CREATE TABLE product_ratings (
    asin            VARCHAR(10)     NOT NULL PRIMARY KEY,
    rating_sum      NUMERIC(14,2)   NOT NULL DEFAULT 0,
    rating_count    BIGINT          NOT NULL DEFAULT 0,
    verified_sum    NUMERIC(14,2)   NOT NULL DEFAULT 0,
    verified_count  BIGINT          NOT NULL DEFAULT 0,
    display_avg     NUMERIC(3,2)    GENERATED ALWAYS AS (
        CASE
            WHEN rating_count = 0 THEN NULL
            ELSE ROUND((10 * 3.0 + rating_sum) / (10 + rating_count), 2)
        END
    ) STORED,
    updated_at      TIMESTAMPTZ     NOT NULL DEFAULT now()
);

-- Atomic increment on new review - no read-modify-write
UPDATE product_ratings
SET
    rating_sum    = rating_sum + $1,
    rating_count  = rating_count + 1,
    verified_sum  = verified_sum  + CASE WHEN $2 THEN $1 ELSE 0 END,
    verified_count= verified_count + CASE WHEN $2 THEN 1 ELSE 0 END,
    updated_at    = now()
WHERE asin = $3;

-- Backfill for soft-deleted reviews (subtract instead of add)
UPDATE product_ratings
SET
    rating_sum    = rating_sum - $1,
    rating_count  = rating_count - 1,
    verified_sum  = verified_sum  - CASE WHEN $2 THEN $1 ELSE 0 END,
    verified_count= verified_count - CASE WHEN $2 THEN 1 ELSE 0 END,
    updated_at    = now()
WHERE asin = $3;

In Redis the same values are maintained as a hash for sub-millisecond cache access:

# Redis commands for atomic rating update - no read needed
HINCRBYFLOAT rating:{asin} sum 4.0
HINCRBY      rating:{asin} count 1
HINCRBYFLOAT rating:{asin} verified_sum 4.0
HINCRBY      rating:{asin} verified_count 1

The Redis hash is the live cache. The PostgreSQL row is the durable source of truth. Every 60 seconds, a reconciliation job reads from Postgres and overwrites the Redis hash for any ASIN where the two disagree by more than 0.01. This handles the case where a Redis restart loses in-memory data and needs to be repopulated from Postgres.

Key Insight

The PostgreSQL UPDATE for rating increments is always a primary key lookup (WHERE asin = $3) followed by arithmetic. It never scans the reviews table. This means the aggregation cost is O(1) per review, not O(n) where n is the number of existing reviews. A product with 50 million reviews costs exactly the same to update as one with 10 reviews.

The Helpfulness Ranking Stage

Helpful vote ranking is a problem that looks simple and turns out to be a statistics problem in disguise. The naive sort - ORDER BY helpful_votes DESC - has a fundamental flaw: a review with 1,000 helpful votes and 1,001 total votes is ranked identically to one with 1,000 helpful votes and 10,000 total votes, even though the latter is provably less helpful (90% agreement vs. 99.9% agreement). The vote count alone tells you nothing about the underlying helpfulness signal without the denominator.

The Wilson score interval solves this. It computes the lower bound of a 95% confidence interval for the true helpfulness proportion, given the observed helpful votes h and total votes n. Reviews with many votes get a tight confidence interval (their lower bound is close to their observed proportion). Reviews with few votes get a wide interval (their lower bound is conservative). The formula:

# Wilson score lower confidence bound for helpful vote ranking
# This is the same formula used by Reddit (prior to their redesign) and Yelp.
import math

def wilson_lower_bound(helpful_votes: int, total_votes: int, confidence: float = 0.95) -> float:
    """
    Compute the lower bound of the Wilson score confidence interval.
    Returns a value in [0, 1] representing the conservative estimate of helpfulness.

    Args:
        helpful_votes: number of users who marked this review as helpful
        total_votes:   total number of helpful/not-helpful votes cast
        confidence:    statistical confidence level (0.95 = 95% CI)

    Returns:
        lower bound of Wilson CI, or 0.0 if no votes
    """
    if total_votes == 0:
        return 0.0

    # z-score for the given confidence level (1.96 for 95% CI)
    z = {0.90: 1.645, 0.95: 1.96, 0.99: 2.576}.get(confidence, 1.96)

    p_hat = helpful_votes / total_votes  # observed proportion

    numerator = (
        p_hat
        + (z ** 2) / (2 * total_votes)
        - z * math.sqrt(
            (p_hat * (1 - p_hat) + (z ** 2) / (4 * total_votes)) / total_votes
        )
    )
    denominator = 1 + (z ** 2) / total_votes

    return numerator / denominator


def compute_helpfulness_score(helpful: int, total: int, verified: bool, age_days: float) -> float:
    """
    Final helpfulness score combining Wilson bound with recency and verified status.
    Amazon also boosts verified purchases because they demonstrate actual product ownership.
    """
    wilson = wilson_lower_bound(helpful, total)

    # Recency decay: reviews older than 1 year decay toward 0.8x their score
    # ln(2) / 365 ~ 0.0019 gives a half-life of roughly 1 year
    recency_factor = math.exp(-0.0019 * max(age_days - 30, 0))

    # Verified purchase boost: 1.15x multiplier
    verified_boost = 1.15 if verified else 1.0

    return wilson * recency_factor * verified_boost

This score is stored in the Elasticsearch document’s helpfulness_score field and in a Redis sorted set per ASIN:

# Maintain per-ASIN sorted set of top-N review IDs by helpfulness score
# Score is the float returned by compute_helpfulness_score()
ZADD helpful:{asin} {score} {review_id}

# Trim to top-1000 to bound memory usage per ASIN
ZREMRANGEBYRANK helpful:{asin} 0 -1001

# Read path: get top-50 review IDs for product page
ZREVRANGE helpful:{asin} 0 49 WITHSCORES
Real World

Reddit used the Wilson score lower bound as their comment ranking algorithm from 2009 to 2014 (described in a widely-cited post by Evan Miller). Yelp uses the same formula for surfacing review quality. The key insight Evan Miller articulated: “What you really want to know is: given the votes I’ve seen so far, what is my best guess as to the true quality of this item?” The Wilson bound is the rigorous statistical answer to that question.

Data Model

The data model reflects the dual-write pattern: every accepted review lands in both PostgreSQL (durable source of truth) and Elasticsearch (search/ranking index). Redis holds ephemeral caches that can be rebuilt from Postgres. S3 holds raw event archives for auditing and ML retraining.

-- Core reviews table - partitioned by asin_prefix for range-based sharding
-- PostgreSQL DDL
CREATE TABLE reviews (
    review_id       UUID            NOT NULL DEFAULT gen_random_uuid(),
    asin            VARCHAR(10)     NOT NULL,
    user_id         UUID            NOT NULL,
    rating          SMALLINT        NOT NULL CHECK (rating BETWEEN 1 AND 5),
    title           VARCHAR(256),
    body            TEXT            NOT NULL,
    body_length     INT             GENERATED ALWAYS AS (length(body)) STORED,
    verified_purchase BOOLEAN       NOT NULL DEFAULT false,
    helpful_votes   INT             NOT NULL DEFAULT 0,
    total_votes     INT             NOT NULL DEFAULT 0,
    helpfulness_score FLOAT8        NOT NULL DEFAULT 0.0,
    fingerprint     CHAR(64)        NOT NULL,  -- SHA-256 hex for dedup
    fraud_score     FLOAT4,                    -- NULL means not yet scored
    fraud_action    VARCHAR(16),               -- 'pass', 'flag', 'reject'
    status          VARCHAR(16)     NOT NULL DEFAULT 'active',  -- active/removed/pending
    created_at      TIMESTAMPTZ     NOT NULL DEFAULT now(),
    updated_at      TIMESTAMPTZ     NOT NULL DEFAULT now(),
    PRIMARY KEY (asin, review_id)
) PARTITION BY RANGE (asin);

-- Range partitions by ASIN first character (26 partitions + overflow)
CREATE TABLE reviews_a PARTITION OF reviews FOR VALUES FROM ('A') TO ('B');
CREATE TABLE reviews_b PARTITION OF reviews FOR VALUES FROM ('B') TO ('C');
-- ... one per letter through Z
CREATE TABLE reviews_z PARTITION OF reviews FOR VALUES FROM ('Z') TO ('[');
CREATE TABLE reviews_other PARTITION OF reviews DEFAULT;

-- Indexes per partition (inherited automatically in PG 11+)
CREATE INDEX idx_reviews_asin_created ON reviews (asin, created_at DESC)
    WHERE status = 'active';
CREATE INDEX idx_reviews_asin_helpful ON reviews (asin, helpfulness_score DESC)
    WHERE status = 'active';
CREATE INDEX idx_reviews_user_asin ON reviews (user_id, asin);
CREATE INDEX idx_reviews_fingerprint ON reviews (fingerprint);

-- BRIN index for time-range scans on large partitions
CREATE INDEX idx_reviews_created_brin ON reviews USING BRIN (created_at);

-- Product-level aggregate table (maintained by Rating Aggregator)
CREATE TABLE product_ratings (
    asin                VARCHAR(10)     NOT NULL PRIMARY KEY,
    rating_sum          NUMERIC(14,2)   NOT NULL DEFAULT 0,
    rating_count        BIGINT          NOT NULL DEFAULT 0,
    verified_sum        NUMERIC(14,2)   NOT NULL DEFAULT 0,
    verified_count      BIGINT          NOT NULL DEFAULT 0,
    display_avg         NUMERIC(3,2)    GENERATED ALWAYS AS (
        CASE WHEN rating_count = 0 THEN NULL
        ELSE ROUND((10.0 * 3.0 + rating_sum) / (10 + rating_count), 2)
        END
    ) STORED,
    star_dist           JSONB           NOT NULL DEFAULT '{"1":0,"2":0,"3":0,"4":0,"5":0}',
    updated_at          TIMESTAMPTZ     NOT NULL DEFAULT now()
);

-- Fraud signals table - append-only audit log
CREATE TABLE fraud_signals (
    id              BIGSERIAL       PRIMARY KEY,
    review_id       UUID            NOT NULL,
    user_id         UUID            NOT NULL,
    asin            VARCHAR(10)     NOT NULL,
    fraud_score     FLOAT4          NOT NULL,
    features        JSONB           NOT NULL,  -- full feature vector for debugging
    action          VARCHAR(16)     NOT NULL,  -- pass/flag/reject
    model_version   VARCHAR(32)     NOT NULL,
    created_at      TIMESTAMPTZ     NOT NULL DEFAULT now()
);

CREATE INDEX idx_fraud_signals_user ON fraud_signals (user_id, created_at DESC);
CREATE INDEX idx_fraud_signals_asin ON fraud_signals (asin, created_at DESC);

-- Dedup log for analytics (Redis is primary, this is for long-term analysis)
CREATE TABLE dedup_events (
    fingerprint     CHAR(64)        NOT NULL,
    user_id         UUID            NOT NULL,
    asin            VARCHAR(10)     NOT NULL,
    reason          VARCHAR(32)     NOT NULL,  -- exact_duplicate/rate_limited
    created_at      TIMESTAMPTZ     NOT NULL DEFAULT now()
) PARTITION BY RANGE (created_at);

The review indexing strategy for Elasticsearch uses a separate document per review with denormalized product data to avoid joins at query time:

{
  "mappings": {
    "properties": {
      "review_id":         { "type": "keyword" },
      "asin":              { "type": "keyword" },
      "user_id":           { "type": "keyword" },
      "rating":            { "type": "byte" },
      "title":             { "type": "text", "analyzer": "english" },
      "body":              { "type": "text", "analyzer": "english" },
      "verified_purchase": { "type": "boolean" },
      "helpful_votes":     { "type": "integer" },
      "total_votes":       { "type": "integer" },
      "helpfulness_score": { "type": "float" },
      "fraud_action":      { "type": "keyword" },
      "status":            { "type": "keyword" },
      "created_at":        { "type": "date" }
    }
  },
  "settings": {
    "number_of_shards":   6,
    "number_of_replicas": 2,
    "refresh_interval":   "5s",
    "analysis": {
      "analyzer": {
        "review_analyzer": {
          "type": "custom",
          "tokenizer": "standard",
          "filter": ["lowercase", "stop", "porter_stem"]
        }
      }
    }
  }
}

The refresh_interval of 5s means a new review will be searchable within 5 seconds of being indexed - matching our SLA for aggregate visibility. For high-frequency helpfulness score updates (every helpful vote), we batch updates via the Bulk API rather than issuing individual _update calls.

Review Submission Data Flow - End to End
Key Insight

The star distribution in the product_ratings table is stored as JSONB (a dictionary mapping star value to count) rather than five separate integer columns. This allows the histogram display on product pages to be read from a single row without any GROUP BY aggregation on the reviews table. The JSONB field is updated atomically via: UPDATE product_ratings SET star_dist = jsonb_set(star_dist, ARRAY[$rating::text], ((star_dist->>$rating::text)::int + 1)::text::jsonb) WHERE asin = $asin.

Key Algorithms and Protocols

Incremental Weighted Average Without Race Conditions

The most critical algorithm in the system is the lock-free incremental average update. The challenge is that two Kafka consumers might try to increment the same ASIN’s aggregate concurrently. In Postgres, the UPDATE ... SET sum = sum + $1 is handled atomically by row-level locking - only one writer can hold the lock at a time. In Redis, HINCRBYFLOAT is also atomic as a single-command operation. But we need both to succeed or both to fail.

We achieve this by treating Redis as the write-ahead cache and Postgres as the durable store, with an idempotency mechanism in case one fails:

# Atomic dual-write for rating aggregation
# Ensures Redis and Postgres stay consistent even under failures
import uuid
import redis
import psycopg2

def aggregate_rating_update(
    r: redis.Redis,
    conn: psycopg2.extensions.connection,
    asin: str,
    rating: int,
    is_verified: bool,
    review_id: str,
) -> None:
    """
    Updates both Redis and Postgres atomically.
    Uses a Redis idempotency key to prevent double-counting on retries.
    """
    idempotency_key = f"agg_done:{review_id}"

    # Check if we've already processed this review (retry safety)
    if r.get(idempotency_key):
        return

    # Step 1: Update Redis (fast path, will be used by read path immediately)
    pipe = r.pipeline(transaction=True)
    pipe.hincrbyfloat(f"rating:{asin}", "sum", float(rating))
    pipe.hincrby(f"rating:{asin}", "count", 1)
    if is_verified:
        pipe.hincrbyfloat(f"rating:{asin}", "verified_sum", float(rating))
        pipe.hincrby(f"rating:{asin}", "verified_count", 1)
    # Update star distribution
    pipe.hincrby(f"rating:{asin}", f"star_{rating}", 1)
    # Mark as done (TTL = 7 days to handle delayed retries)
    pipe.set(idempotency_key, "1", ex=604800)
    pipe.execute()

    # Step 2: Update Postgres (durable path)
    with conn.cursor() as cur:
        cur.execute("""
            INSERT INTO product_ratings (asin, rating_sum, rating_count, verified_sum, verified_count)
            VALUES (%s, %s, 1, %s, %s)
            ON CONFLICT (asin) DO UPDATE
            SET
                rating_sum     = product_ratings.rating_sum + EXCLUDED.rating_sum,
                rating_count   = product_ratings.rating_count + 1,
                verified_sum   = product_ratings.verified_sum + EXCLUDED.verified_sum,
                verified_count = product_ratings.verified_count + EXCLUDED.verified_count,
                updated_at     = now()
        """, (
            asin,
            float(rating),
            float(rating) if is_verified else 0.0,
            1 if is_verified else 0,
        ))
    conn.commit()

Abuse Detection via Velocity Windows

Abuse detection uses a sliding window counter in Redis to detect unusual patterns. Unlike a fixed window (which resets abruptly and can be gamed by timing), a sliding window counts events in a rolling time range:

# Sliding window rate tracking for abuse detection signals
import time
import redis

def record_review_velocity(r: redis.Redis, asin: str, user_id: str) -> None:
    """
    Updates velocity tracking for both the product and the user.
    Uses a Redis sorted set with timestamp as score for sliding window.
    """
    now = int(time.time() * 1000)  # millisecond precision
    pipe = r.pipeline(transaction=False)

    # Product velocity: track all reviewers on this ASIN in a 1-hour window
    prod_vel_key = f"prod_velocity:{asin}"
    pipe.zadd(prod_vel_key, {f"{user_id}:{now}": now})
    pipe.zremrangebyscore(prod_vel_key, 0, now - 3_600_000)  # remove > 1h ago
    pipe.expire(prod_vel_key, 7200)

    # User velocity: track all ASINs this user reviewed in last 24 hours
    user_vel_key = f"user_velocity:{user_id}"
    pipe.zadd(user_vel_key, {f"{asin}:{now}": now})
    pipe.zremrangebyscore(user_vel_key, 0, now - 86_400_000)  # remove > 24h ago
    pipe.expire(user_vel_key, 172800)

    pipe.execute()


def get_velocity_signals(r: redis.Redis, asin: str, user_id: str) -> dict:
    """
    Reads current velocity signals for fraud feature extraction.
    All reads are O(log N) ZCOUNT operations.
    """
    now = int(time.time() * 1000)

    pipe = r.pipeline(transaction=False)
    pipe.zcount(f"prod_velocity:{asin}", now - 3_600_000, now)
    pipe.zcount(f"user_velocity:{user_id}", now - 86_400_000, now)
    pipe.zcount(f"user_velocity:{user_id}", now - 604_800_000, now)
    results = pipe.execute()

    return {
        "product_reviews_1h": results[0],
        "user_reviews_24h":   results[1],
        "user_reviews_7d":    results[2],
    }

Scaling and Performance

Kafka Partitioning and Redis Cluster Sharding

Capacity estimation - building the back-of-envelope model that drives our sizing decisions:

Given:
  - Peak write throughput: 500 reviews/second
  - Average review size: 800 bytes (title + body + metadata)
  - Read traffic: 50,000 requests/second for product pages
  - Retention: 3 years active + 7 years archive

Storage (PostgreSQL, reviews table):
  500 req/s * 86400 s/day * 365 days/year * 3 years = ~47 billion reviews
  ...wait, that is not right. Let us recalibrate.
  Amazon has ~600 million total reviews (cumulative, all time).
  Average growth: ~150 million new reviews/year = ~4.7/second average.
  Peak is 500/s during events - about 100x the average.
  3 years of growth at 150M/year = 450M new reviews.
  450M reviews * 1200 bytes average = ~540 GB raw review data.
  With indexes and overhead: ~1.5 TB.

Storage (Elasticsearch reviews index):
  450M documents * 1500 bytes = ~675 GB.
  3x replication = ~2 TB.

Redis working set:
  rating:{asin}: ~50 bytes per ASIN, 400M ASINs = 20 GB (too large - cache top 10M active)
  dedup:{fp}: 32 bytes per fingerprint, 90-day window, 150M reviews/year * 90/365 = ~37M keys
  37M keys * 50 bytes overhead = ~1.85 GB.
  helpful:{asin}: 1000 review IDs per ASIN, 10M active ASINs, 8 bytes per ID = 80 GB.
  Total Redis working set: ~100 GB across 6-shard cluster.

Bandwidth (Kafka):
  500 reviews/s * 1500 bytes (with metadata) = 750 KB/s write to Kafka.
  3x replication = 2.25 MB/s. Trivial for Kafka.

Compute (Fraud scoring):
  500 reviews/s, each taking 50ms to score = 25 concurrent scorer threads needed.
  With 8-core instances running 4 threads each: 1 instance handles ~32 concurrent.
  Deploy 3 instances for safety margin.

API Gateway:
  Write path: 500/s * 250ms avg latency = 125 concurrent requests. 2 medium instances.
  Read path: 50,000/s * 5ms avg latency = 250 concurrent requests. 4 medium instances.
  CDN absorbs ~80% of read traffic for popular ASINs, real origin load: ~10,000/s.

The dominant bottleneck at scale is the Redis cluster handling the helpful:{asin} sorted sets for the top 10 million active products. A product with 100,000 reviews that receives 100 helpful votes per second generates 100 ZADD operations per second against a single Redis key. Redis is single-threaded per slot, so a hot ASIN can saturate a single Redis shard. The mitigation is to shard the sorted set across multiple keys using consistent hashing: helpful:{asin}:{shard_id} where shard_id = hash(review_id) % 16. Reads must MERGE results from all 16 shards, but writes are distributed.

For Kafka partitioning, we use asin as the partition key (after hashing). This ensures all reviews for a given product flow through the same consumer instance, which simplifies the product-level velocity tracking - a consumer only needs to check its local state rather than coordinate with other consumers.

Real World

Amazon’s actual Elastic MapReduce pipelines run overnight to recompute fraud scores for the entire review corpus when the ML model is updated. This “batch re-scoring” ensures that historically accepted reviews get re-evaluated with the latest fraud signals - a review farm that was clever enough to evade an older model may be caught by a retrained one. Stripe uses the same pattern for fraud: real-time scoring on new events, plus periodic batch re-evaluation of historical decisions as the model improves.

Failure Modes and Recovery

FailureDetectionImpactRecovery
Redis cluster shard failureRedis Sentinel alerts, SETNX timeoutsDedup checks fail; reviews may double-acceptFailover to replica (auto within 30s); backfill dedup set from Postgres fingerprint column on recovery
Kafka broker outageConsumer lag metric exceeds 10,000 msgsReview submissions queue in API Gateway memory (bounded); no data loss if within 30s bufferKafka auto-elects new leader for affected partitions; consumer resumes from last committed offset
Fraud model service downHealth check misses; CPU drops to baselineAll reviews pass fraud gate automatically (fail-open)Restart service; queue will replay unscored events; batch re-scoring job corrects false-pass decisions
PostgreSQL primary failureReplication lag drops to 0; health check failsWrites to product_ratings fail; Redis still serves readsPromote replica (RDS Multi-AZ auto-promotes in ~60s); aggregate updates replay from Kafka backlog
Elasticsearch index corruptionQuery error rate spikes; slow log entriesSearch and sorted review display unavailableRe-index from PostgreSQL (estimated 4 hours for 450M documents at 30,000 docs/s Bulk API)
Rating aggregate drift (Redis vs Postgres)Hourly reconciliation job detects delta > 0.01Display rating slightly wrongReconciliation job overwrites Redis from Postgres for affected ASINs; root cause is typically a Redis key eviction under memory pressure
Watch Out

The most common operational mistake with incremental aggregates is forgetting to handle soft-deletes. If a moderator removes a fraudulent 5-star review after the aggregator has already counted it, you must subtract its contribution from product_ratings.rating_sum AND rating_count. Systems that only append never subtract, so after 6 months of moderation activity the displayed average can drift by 0.2-0.5 stars for affected products. Always implement a removal path that mirrors the addition path exactly.

Comparison of Approaches

ApproachWrite LatencyRead LatencyFraud DetectionScalabilityBest Fit
Synchronous SQL pipeline (all in one transaction)100-500ms10-50msSimple rules onlyLow (DB bottleneck)Internal tools with low volume
Async Kafka pipeline (this design)50-200ms (ACK before score)5-20ms (Redis cache)Full ML scoringHigh (partition scaling)High-throughput consumer marketplace
Lambda-per-event (serverless)200-1000ms (cold starts)10-50msML scoring viableMedium (concurrency limits)Spiky, low-baseline workloads
Batch aggregation (nightly MapReduce)Immediate write, stale display5ms (precomputed)Full ML + graphVery highAnalytics-only, no real-time requirement
CQRS with event sourcing50-150ms5-15msFull ML scoringHighAudit-heavy systems needing temporal queries

The async Kafka pipeline is the right choice here because the latency contract allows it - customers do not expect their review to appear instantly (they accept “your review is being processed”), and the fraud detection quality available with async ML scoring is far superior to what is achievable synchronously. Lambda-per-event looks attractive initially but the 15-minute concurrency scale-out delay in AWS Lambda makes it unsuitable for Prime Day spikes where traffic can 10x in 30 seconds. The batch approach violates the 5-second visibility SLA entirely.

Key Takeaways

  • Deduplication fingerprinting should operate on normalized content rather than raw text, and use Redis SETNX for O(1) dedup checks that do not require reading existing data.
  • Fraud signal features are most powerful when combined: account age alone is weak, but account age plus rating variance plus product velocity together create a feature space that is hard for farms to game simultaneously.
  • Weighted average calculation should be maintained incrementally via (sum, count) pairs rather than recomputed via SELECT AVG(), reducing write cost from O(n) to O(1) per review.
  • Helpful vote ranking using the Wilson score lower bound produces a statistically defensible ordering that resists manipulation and naturally handles the cold-start problem for new reviews.
  • Review pipeline stages should be independently scalable - the fraud detection consumers can scale to 32 instances without affecting the dedup service or the read path.
  • Abuse detection requires sliding window counters (not fixed windows) to accurately capture burst behavior without gap exploitation at window boundaries.
  • Review indexing in Elasticsearch should use a 5-second refresh interval to balance real-time visibility against the overhead of frequent segment merges; sub-second refresh would make the cluster unusable under write load.
  • Soft-delete handling must mirror the add path exactly: any aggregated quantity that was incremented on review creation must be decremented on review removal, or your averages will drift over time.

The counter-intuitive lesson in this design is that the hardest problem is not fraud detection - it is keeping the aggregate correct under all possible failure and race conditions. The fraud ML model is sophisticated but bounded: it either accepts or rejects a review, and you can always batch re-evaluate. But a rating_sum that drifts by 0.3 stars for a top-selling product can shift purchase decisions for millions of customers and is extraordinarily difficult to detect, diagnose, and correct after the fact.

Frequently Asked Questions

Q: Why not use a single Redis sorted set per ASIN for helpful vote ranking instead of a separate Redis key plus Elasticsearch?

A: Redis sorted sets handle ranking beautifully for top-50 access patterns, and we do use them for the product page hot path. But Elasticsearch is essential for two use cases: full-text search within a product’s reviews (“show me reviews mentioning battery life”) and faceted filtering (star rating, verified purchase, date range). Redis has no text search capability. The sorted set in Redis is a cache of the Elasticsearch ordering - it avoids hitting Elasticsearch for every product page load while Elasticsearch handles the rich query use cases.

Q: Why not store the fraud score synchronously and reject inline rather than in a Kafka consumer?

A: Feature extraction requires multiple Redis lookups (account history, product velocity, historical signals) and an ML model inference call, all of which add 30-200ms. Adding this to the synchronous request path would push p99 write latency to 400-600ms, breaking the sub-250ms SLA. The tradeoff is that a fraudulent review is live for up to 500ms before being removed - acceptable for a review system (unlike, say, a financial transaction where milliseconds matter). The fail-open behavior during model outages is the bigger concern; it’s mitigated by the batch re-scoring job.

Q: Why not use Kafka Streams or Flink instead of a custom consumer for the aggregation?

A: Kafka Streams would work well for the fraud detection and rating aggregation stages - it has built-in windowed aggregation, exactly-once semantics, and changelog topics for state recovery. We avoided it here to keep the operational complexity manageable: a Kafka Streams application adds a new deployment topology (the streams topology) that must be co-located with the Kafka cluster. The custom consumer with Redis + Postgres achieves the same outcome with more obvious failure modes. At hyperscale (10,000 reviews/second), migrating to Flink’s stateful streaming would be the right call.

Q: Why not use PostgreSQL full-text search (tsvector) instead of Elasticsearch for review search?

A: PostgreSQL full-text search works well at lower scale but becomes a bottleneck when search traffic is 50,000 requests/second and the corpus is 450 million documents. The @@ tsvector operator requires an index scan that still touches millions of rows for common search terms. Elasticsearch’s inverted index with per-shard distribution handles this natively and allows horizontal scaling via adding shards. PostgreSQL’s tsvector index is a good solution up to tens of millions of rows and a few hundred queries/second - beyond that, the dedicated search engine wins.

Q: How do you handle the star distribution histogram update atomically?

A: We use PostgreSQL’s jsonb_set function to atomically update the star count inside the JSONB column in a single SQL statement without a read-modify-write cycle: UPDATE product_ratings SET star_dist = jsonb_set(star_dist, ARRAY[$1::text], to_jsonb((star_dist->>$1::text)::int + 1)) WHERE asin = $2. This is a single row write protected by row-level locking. The Redis equivalent maintains five separate HINCRBY fields (star_1 through star_5) that are assembled into the histogram by the read path.

Q: Why not use eventual consistency for the dedup check rather than synchronous Redis?

A: Eventual consistency for deduplication is the wrong tradeoff here. If we accept a duplicate while the dedup state propagates, we will show customers a duplicate review on the product page - a visible quality defect. The Redis SETNX check adds under 5ms to the write path and provides strong single-key consistency. The only case where this fails is if the Redis cluster is completely unavailable, at which point we fail-open (accept submissions without dedup checking) and rely on the downstream idempotent Postgres insert to catch duplicates. A 5-minute Redis outage means at most 1,500 duplicate reviews are accepted - a manageable moderation workload.

Interview Questions

Q: Walk me through how you would design the deduplication system to handle the case where the same customer submits a review from both a mobile app and a desktop browser simultaneously, with unreliable network causing each to retry three times.

Expected depth: Discuss idempotency keys, the race condition between concurrent SETNX operations across multiple API servers, how the Postgres ON CONFLICT DO NOTHING on the fingerprint column acts as the final dedup gate, and why optimistic dedup (accept first, deduplicate later) is preferable to pessimistic locking for latency-sensitive write paths. Mention that the worst-case outcome is two Kafka messages for the same review, which the consumer handles via the idempotency key pattern.

Q: How would you update the fraud model for all 450 million historical reviews when the ML team deploys a new version?

Expected depth: The candidate should describe a batch re-scoring job using Spark or Flink that reads from S3 (raw event archive), re-extracts features from the feature store (using historical snapshots), runs the new model, and writes decisions back to the fraud_signals table and Postgres reviews.fraud_score. Key challenges: the feature store may not have historical snapshots of account-level signals at review time, requiring temporal joins. Discuss the tradeoff between re-scoring all 450M (4-8 hour job) vs. only recent reviews (faster, misses historical fakes). A good answer mentions shadow scoring the new model before deploying it.

Q: The product team wants the review average on product pages to update within 1 second instead of 5 seconds. How do you achieve this without changing the database architecture?

Expected depth: The current 5-second lag comes from Elasticsearch’s refresh_interval = 5s plus Kafka consumer processing time. For the average star rating (not search), the fix is to update the Redis cache synchronously when the Kafka consumer processes the review (which takes under 500ms end-to-end). Product pages should read the average directly from Redis HGET rating:{asin} sum / HGET rating:{asin} count rather than from Elasticsearch. The candidate should distinguish between the display average (Redis, near-real-time) and search index freshness (Elasticsearch, 5s interval is fine for search).

Q: A seller approaches Amazon with evidence that a competitor has been flooding their product with 1-star reviews from fake accounts over the past 6 months. How would you retroactively identify and remove those reviews, and how would you re-compute the product’s corrected average?

Expected depth: Describe the investigation query: find all 1-star reviews on the ASIN in the time window, join with fraud_signals to identify those with fraud_score above a threshold, then look for coordinated account behavior (same IP ranges, same account creation batch dates) using the features JSONB column. Removal requires: soft-deleting the review records, subtracting their contributions from product_ratings in a single transaction, deleting their entries from Redis sorted sets, and triggering a bulk delete in Elasticsearch by review_id. Mention the audit trail - every deleted review should write a row to a moderation_actions table with the reason and moderator ID.

Q: How do you prevent the Redis sorted set for helpful vote ranking from becoming unbounded in size for a product that accumulates 10 million reviews over 10 years?

Expected depth: The ZREMRANGEBYRANK trim (keeping top-1000) is the first answer - product pages only show 50, so caching 1,000 gives buffer for filtered views. The candidate should go further: discuss why we need the full ranked list (some customers sort by “most critical” or filter by star rating), which means the sorted set cannot be arbitrarily truncated. The real solution is tiered storage: keep the top-10,000 in Redis sorted set, store the full ranking in DynamoDB keyed by (asin, sort_key), and fall back to DynamoDB for deeper pagination. This caps Redis memory while preserving full ranking for power users.

Premium Content

Unlock the full article along with everything else in the archive — all in one place.

In-depth analysis Expert insights Full archive access
Unlock Full Article