Build Amazon's Review Aggregation Pipeline
data-engineering scalability performance
System Design Deep Dive
Review Aggregation Pipeline
Deduplicating millions of submissions, filtering fraud at signal level, and surfacing trustworthy rankings in real time.
Think of Amazon’s review system as a municipal water treatment plant. Raw water - reviews - flows in from thousands of sources simultaneously. Some of it is perfectly clean. Some carries sediment you can filter mechanically - duplicate submissions that arrive twice because a mobile app retried on a flaky connection. Some carries chemical contaminants - fake reviews planted by sellers or review farms - that require a completely different detection mechanism. And the plant must deliver clean, drinkable water to 300 million customers in real time, not hours later. The treatment pipeline does not operate on water in batch mode once per day. It processes continuously, taps multiple filtration stages in sequence, and maintains rolling quality measurements across every product in a catalogue of 400 million items.
The numbers make this hard in very specific ways. Amazon processes roughly 2.5 million new reviews per day across its global marketplace - that is about 30 submissions per second at average load, spiking to over 500 per second during Prime Day or holiday shopping events. Each review carries not just text and a star rating but a behavioral context: when the user purchased the item, how many reviews that account has written in the last 30 days, whether the purchase was verified, and dozens of other signals. Storing that raw event is the easy part. The hard part is doing six things simultaneously: detecting duplicates before they inflate counts, scoring fraud risk within the same request window, updating a product’s rolling average atomically, re-ranking the “most helpful” list, keeping the search index fresh, and serving all of this to product detail pages that see 10,000 reads per second for a bestselling ASIN.
A naive approach - write review to Postgres, recompute the average via SELECT AVG(rating), and sort by helpful_votes DESC - breaks at scale for three distinct reasons. First, a full-table scan over 50 million reviews for a popular product just to recompute one average kills your database during every write. Second, duplicate detection via a UNIQUE constraint on (user_id, product_id) misses the case where a review farm submits 500 slightly different reviews from 500 different compromised accounts all rating the same ASIN. Third, a simple ORDER BY helpful_votes DESC ranking is trivially gameable: a single viral review from a week ago will always outrank a newer one that is objectively more useful. You need a confidence-adjusted score, not a raw vote count.
We need to solve for throughput isolation - writes must not block reads, and fraud scoring must not block writes - plus incrementally maintained aggregates instead of recomputed ones, plus a statistically robust ranking function, all simultaneously.
Requirements and Constraints
Functional Requirements
- Accept review submissions (star rating 1-5, free text up to 5,000 chars, optional media) and return an acknowledgement within 200ms
- Deduplicate submissions: reject exact duplicates within 90 days and near-duplicates from the same account on the same product within 24 hours
- Score each review for fraud using behavioral signals; reject high-confidence fraud, queue borderline cases for human review
- Maintain per-product rolling average star rating that updates within 5 seconds of a new verified review landing
- Rank visible reviews per product by “helpfulness,” defined as a Wilson score lower confidence bound on the helpful-vote ratio
- Surface the top-50 reviews per product on the product detail page (sorted by helpfulness, then recency)
- Allow full-text search of reviews per product with faceting by star rating, verified purchase, and recency
- Support soft-deletion of reviews that are later flagged by moderators, with all aggregates updated accordingly
Non-Functional Requirements
- Write throughput: 500 submissions/second sustained, 2,000/second burst (10x Prime Day spike)
- Read throughput: 50,000 requests/second for product review pages (read-heavy ratio 100:1)
- Review aggregate latency: average star rating visible to 99% of users within 5 seconds of a new review passing fraud check
- API p99 write latency: under 250ms (synchronous dedup check + Kafka publish)
- API p99 read latency: under 50ms for cached product review summaries
- Fraud detection latency: under 500ms per review in the async scoring pipeline
- Durability: zero review data loss; all raw submissions retained in S3 for 7 years (regulatory)
- Availability: 99.99% uptime on read path; writes can degrade gracefully to async-only mode
- Storage: approximately 5 TB for 3 years of review text + metadata; 200 GB for Redis working set
Constraints and Assumptions
- Reviews are not editable after submission (simplifies aggregate maintenance)
- Helpful votes are a separate event stream, not part of the review submission pipeline
- We do not build the ML training pipeline here - the fraud model is pre-trained and served via a feature store
- Geographic distribution (per-country review sets) is out of scope; we design for a single region
- Comment/reply threads on reviews are out of scope
High-Level Architecture
The system separates into five major components with distinct scaling characteristics. Understanding which component dominates which bottleneck is the key to sizing the system correctly.
The Review API Gateway handles inbound submissions, enforces rate limits (10 reviews/user/day), authenticates the request via the customer identity service, and performs the synchronous deduplication check. It returns a 202 Accepted to the customer immediately after publishing to Kafka - the rest of the pipeline is asynchronous.
The Dedup Service runs inside the API gateway request path. It computes a fingerprint from (asin, user_id, normalized_text) and checks Redis for that fingerprint using SETNX. If the key already exists, the submission is rejected with a 409 Conflict. This is the only part of the pipeline that blocks the customer’s HTTP request.
The Fraud Detection Service consumes the review-submissions Kafka topic, extracts behavioral signals, scores via a pre-loaded XGBoost model, and routes: clean reviews pass to the rating aggregator, high-confidence fraud is dropped with a moderation record, and borderline cases go to a human review queue backed by DynamoDB.
The Rating Aggregator maintains (weighted_sum, count) per ASIN using Redis HINCRBYFLOAT and HINCRBY, computing the live average as weighted_sum / count. It also writes the updated aggregate to PostgreSQL for durability. This component handles the hot-path throughput problem by avoiding any read-modify-write cycle.
The Review Index is an Elasticsearch cluster that maintains a document per review with analyzed text, structured facets, and a helpfulness_score field that drives the sort. Elasticsearch’s update_by_query is too slow for real-time updates; instead, each vote event triggers a targeted _update on the specific review document.
The entire write path never blocks on a database read. Dedup uses Redis SETNX (a pure write), fraud scoring is async, and rating aggregation uses HINCRBYFLOAT (incremental, no read needed). This is why the system can sustain 500 writes/second without read amplification.
The Deduplication Stage
The deduplication job is to answer a single question before the customer’s HTTP connection closes: “have we seen this review before?” Like a postal sorter checking return addresses against a bounce list, it must do this check in under 10 milliseconds for the write latency budget to work out.
Deduplication fingerprinting works at two levels. The first level is exact deduplication: we compute SHA-256(asin + ":" + user_id + ":" + normalize(text)) where normalize strips punctuation, lowercases, and collapses whitespace. This 32-byte hash is stored in Redis as a key with a 90-day TTL using SET dedup:{hash} 1 EX 7776000 NX. If SET returns nil (key existed), we reject. If it returns OK, we proceed. The entire check is a single round-trip to Redis at under 1ms.
The second level is near-duplicate detection for the case where a review farm submits 100 slightly varied versions of the same review from different accounts. We use a per-user, per-product sliding window: SET dedup_user:{user_id}:{asin} 1 EX 86400 NX. If a user already submitted any review for this product in the last 24 hours, we reject the second one. This is separate from the 90-day exact fingerprint because the user might legitimately update their opinion - we rate-limit them to one review per day but allow them to write again after the 24-hour window clears.
# Deduplication logic - two-level fingerprint check
import hashlib
import re
import redis
def normalize_text(text: str) -> str:
# Strip punctuation, lowercase, collapse whitespace
text = text.lower()
text = re.sub(r'[^\w\s]', '', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def check_dedup(r: redis.Redis, asin: str, user_id: str, text: str) -> tuple[bool, str]:
"""
Returns (is_duplicate, reason).
Uses two-level dedup: exact fingerprint + per-user-per-product window.
"""
# Level 1: exact content fingerprint (90-day window)
normalized = normalize_text(text)
fp_input = f"{asin}:{user_id}:{normalized}"
fingerprint = hashlib.sha256(fp_input.encode()).hexdigest()
exact_key = f"dedup:{fingerprint}"
# Level 2: per-user-per-product daily rate limit
user_key = f"dedup_user:{user_id}:{asin}"
pipe = r.pipeline(transaction=False)
pipe.set(exact_key, "1", ex=7_776_000, nx=True) # 90 days
pipe.set(user_key, "1", ex=86_400, nx=True) # 24 hours
results = pipe.execute()
# results[0] = None if exact dup, True if new
# results[1] = None if user already reviewed today, True if new
if results[0] is None:
# Exact duplicate - we must clean up the user key we may have just set
if results[1] is True:
r.delete(user_key)
return True, "exact_duplicate"
if results[1] is None:
# Same user, same product, within 24 hours
# Roll back the fingerprint key we just set
r.delete(exact_key)
return True, "rate_limited"
return False, ""
def submit_review(r: redis.Redis, kafka_producer, review: dict) -> dict:
is_dup, reason = check_dedup(
r, review["asin"], review["user_id"], review["text"]
)
if is_dup:
return {"status": "rejected", "reason": reason}
kafka_producer.produce(
topic="review-submissions",
key=review["asin"].encode(),
value=review,
)
return {"status": "accepted"}
Notice that we pipeline both Redis SET operations in a single round-trip using pipeline(transaction=False). We do not need atomicity between the two keys - they serve different purposes and it is acceptable to roll back one if the other reveals a duplicate. The rollback adds one extra round-trip in the rejection case, which is rare relative to accepted submissions.
A race condition exists if two identical reviews arrive within the same millisecond from different API servers. Both servers run the pipeline simultaneously - both get True for the SET operations - and both accept the review. This is acceptable: the Kafka consumer downstream will encounter two messages with the same fingerprint. We handle this by making the Postgres insert idempotent using ON CONFLICT DO NOTHING on the fingerprint column. The probability of this race is extremely low under normal load, and the consequence is a harmless duplicate that the constraint catches.
The Fraud Detection Stage
Fraud detection is the component most engineers underestimate. The instinct is to write a few if-then rules: “reject reviews from accounts under 7 days old” or “flag if the user reviewed 10 products in 24 hours.” Rules like these are necessary but not sufficient. A sophisticated review farm will operate with aged accounts, post at moderate velocity, and vary their text just enough to evade exact dedup. What we need is a feature-rich signal-based approach that uses behavioral patterns, not just individual thresholds.
Fraud signal features are extracted per review event and combined into a feature vector that feeds a pre-trained gradient boosted tree model. The features fall into three categories.
Account-level features capture the long-term behavioral profile of the submitting user: account_age_days, lifetime_review_count, verified_purchase_ratio (what fraction of their reviews were verified purchases), avg_rating_given (chronic 1-star or 5-star givers are suspicious), and rating_variance (a farm may rate the same product 5 stars 50 times in a row with near-zero variance).
Session-level features capture burst behavior in short time windows: reviews_last_24h, reviews_last_7d, unique_asins_last_24h, avg_text_length_7d, and whether the submission arrived via a known datacenter IP range (often bot traffic).
Product-level features capture unusual review patterns on the target ASIN: review_velocity_last_hour (number of new reviews for this product in the last hour, compared to the 30-day average), new_account_ratio_last_24h (fraction of recent reviewers with accounts under 30 days old), and rating_spike_delta (how much the average would shift if this review were accepted).
# Fraud signal feature extraction
from dataclasses import dataclass
import redis
import time
@dataclass
class FraudFeatures:
account_age_days: float
lifetime_review_count: int
verified_purchase_ratio: float
avg_rating_given: float
rating_variance: float
reviews_last_24h: int
reviews_last_7d: int
unique_asins_24h: int
avg_text_length_7d: float
is_datacenter_ip: bool
product_velocity_1h: float # reviews/hour vs 30d baseline
product_new_acct_ratio: float
rating_spike_delta: float
def extract_features(
r: redis.Redis,
user_id: str,
asin: str,
rating: int,
text: str,
is_verified: bool,
account_created_ts: int,
) -> FraudFeatures:
now = int(time.time())
account_age_days = (now - account_created_ts) / 86400.0
# Pull pre-computed user signals from Redis hash (updated by background job)
user_key = f"fraud:{user_id}"
u = r.hgetall(user_key)
lifetime = int(u.get(b"lifetime", 0))
vp_count = int(u.get(b"vp_count", 0))
sum_rating = float(u.get(b"sum_rating", 0.0))
sum_sq_rating = float(u.get(b"sum_sq_rating", 0.0))
rev_24h = int(u.get(b"rev_24h", 0))
rev_7d = int(u.get(b"rev_7d", 0))
uniq_24h = int(u.get(b"uniq_24h", 0))
avg_len_7d = float(u.get(b"avg_len_7d", 200.0))
vp_ratio = (vp_count / lifetime) if lifetime > 0 else 0.0
avg_rating = (sum_rating / lifetime) if lifetime > 0 else 3.0
# Variance = E[X^2] - (E[X])^2
variance = (
(sum_sq_rating / lifetime) - avg_rating ** 2
if lifetime > 0 else 1.0
)
# Product-level signals from Redis sorted set
prod_key = f"prod_velocity:{asin}"
one_hour_ago = now - 3600
velocity_1h = r.zcount(prod_key, one_hour_ago, now)
# Baseline: use 30d average velocity stored as float
baseline_key = f"prod_baseline:{asin}"
baseline = float(r.get(baseline_key) or 1.0)
velocity_ratio = velocity_1h / max(baseline, 0.1)
# Rating spike: how much would avg change with this new rating?
rating_cache_key = f"rating:{asin}"
rc = r.hgetall(rating_cache_key)
current_sum = float(rc.get(b"sum", 0))
current_count = int(rc.get(b"count", 1))
current_avg = current_sum / current_count
new_avg = (current_sum + rating) / (current_count + 1)
spike_delta = abs(new_avg - current_avg)
# New-account ratio: check fraction of recent reviewers with age < 30d
new_acct_key = f"prod_new_acct:{asin}"
total_recent = r.zcount(new_acct_key, now - 86400, now)
new_recent = float(r.get(f"prod_new_acct_count:{asin}") or 0)
new_acct_ratio = (new_recent / total_recent) if total_recent > 0 else 0.0
return FraudFeatures(
account_age_days=account_age_days,
lifetime_review_count=lifetime,
verified_purchase_ratio=vp_ratio,
avg_rating_given=avg_rating,
rating_variance=variance,
reviews_last_24h=rev_24h,
reviews_last_7d=rev_7d,
unique_asins_24h=uniq_24h,
avg_text_length_7d=avg_len_7d,
is_datacenter_ip=False, # injected from request metadata
product_velocity_1h=velocity_ratio,
product_new_acct_ratio=new_acct_ratio,
rating_spike_delta=spike_delta,
)
The XGBoost model outputs a fraud probability between 0 and 1. We apply three routing thresholds: below 0.30 means the review passes cleanly; between 0.30 and 0.70 it goes to a human moderation queue; above 0.70 it is automatically rejected and a moderation record is written. The 0.70 threshold is tuned for high precision - we would rather let a marginal fake review through than incorrectly suppress a genuine customer’s voice.
Amazon’s actual fraud detection (called “community abuse detection”) uses a similar ML + rules hybrid. The key addition they use is graph-based features: if a user shares 5 or more verified purchases with another known bad actor, that co-purchase graph edge becomes a strong fraud signal. Facebook’s integrity team applies the same technique - coordinated inauthentic behavior is much easier to detect as a graph anomaly than as isolated events.
The Rating Aggregation Stage
Weighted average calculation is the heart of the system’s performance story. Imagine you are a bank teller keeping a running account balance. You do not re-count all the bills in the vault every time a customer deposits a dollar - you just add the deposit to the current balance. The Rating Aggregator applies this same principle: it maintains a (weighted_sum, count) pair per ASIN and updates it incrementally on every accepted review, never scanning historical records.
The star average displayed on Amazon is not a raw arithmetic mean. It uses a Bayesian-smoothed estimate that biases toward 3.0 when review counts are low (to prevent a product with one 5-star review from showing 5.0) and converges toward the true mean as reviews accumulate. The formula is:
bayesian_avg = (C * m + sum_ratings) / (C + count)
Where C is a prior weight (typically 10, representing 10 “phantom” reviews at the mean rating m = 3.0). This is mathematically equivalent to treating the prior as a Dirichlet distribution with concentration parameter C over the five star ratings.
-- Rating aggregation table - source of truth for product averages
-- PostgreSQL schema
CREATE TABLE product_ratings (
asin VARCHAR(10) NOT NULL PRIMARY KEY,
rating_sum NUMERIC(14,2) NOT NULL DEFAULT 0,
rating_count BIGINT NOT NULL DEFAULT 0,
verified_sum NUMERIC(14,2) NOT NULL DEFAULT 0,
verified_count BIGINT NOT NULL DEFAULT 0,
display_avg NUMERIC(3,2) GENERATED ALWAYS AS (
CASE
WHEN rating_count = 0 THEN NULL
ELSE ROUND((10 * 3.0 + rating_sum) / (10 + rating_count), 2)
END
) STORED,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Atomic increment on new review - no read-modify-write
UPDATE product_ratings
SET
rating_sum = rating_sum + $1,
rating_count = rating_count + 1,
verified_sum = verified_sum + CASE WHEN $2 THEN $1 ELSE 0 END,
verified_count= verified_count + CASE WHEN $2 THEN 1 ELSE 0 END,
updated_at = now()
WHERE asin = $3;
-- Backfill for soft-deleted reviews (subtract instead of add)
UPDATE product_ratings
SET
rating_sum = rating_sum - $1,
rating_count = rating_count - 1,
verified_sum = verified_sum - CASE WHEN $2 THEN $1 ELSE 0 END,
verified_count= verified_count - CASE WHEN $2 THEN 1 ELSE 0 END,
updated_at = now()
WHERE asin = $3;
In Redis the same values are maintained as a hash for sub-millisecond cache access:
# Redis commands for atomic rating update - no read needed
HINCRBYFLOAT rating:{asin} sum 4.0
HINCRBY rating:{asin} count 1
HINCRBYFLOAT rating:{asin} verified_sum 4.0
HINCRBY rating:{asin} verified_count 1
The Redis hash is the live cache. The PostgreSQL row is the durable source of truth. Every 60 seconds, a reconciliation job reads from Postgres and overwrites the Redis hash for any ASIN where the two disagree by more than 0.01. This handles the case where a Redis restart loses in-memory data and needs to be repopulated from Postgres.
The PostgreSQL UPDATE for rating increments is always a primary key lookup (WHERE asin = $3) followed by arithmetic. It never scans the reviews table. This means the aggregation cost is O(1) per review, not O(n) where n is the number of existing reviews. A product with 50 million reviews costs exactly the same to update as one with 10 reviews.
The Helpfulness Ranking Stage
Helpful vote ranking is a problem that looks simple and turns out to be a statistics problem in disguise. The naive sort - ORDER BY helpful_votes DESC - has a fundamental flaw: a review with 1,000 helpful votes and 1,001 total votes is ranked identically to one with 1,000 helpful votes and 10,000 total votes, even though the latter is provably less helpful (90% agreement vs. 99.9% agreement). The vote count alone tells you nothing about the underlying helpfulness signal without the denominator.
The Wilson score interval solves this. It computes the lower bound of a 95% confidence interval for the true helpfulness proportion, given the observed helpful votes h and total votes n. Reviews with many votes get a tight confidence interval (their lower bound is close to their observed proportion). Reviews with few votes get a wide interval (their lower bound is conservative). The formula:
# Wilson score lower confidence bound for helpful vote ranking
# This is the same formula used by Reddit (prior to their redesign) and Yelp.
import math
def wilson_lower_bound(helpful_votes: int, total_votes: int, confidence: float = 0.95) -> float:
"""
Compute the lower bound of the Wilson score confidence interval.
Returns a value in [0, 1] representing the conservative estimate of helpfulness.
Args:
helpful_votes: number of users who marked this review as helpful
total_votes: total number of helpful/not-helpful votes cast
confidence: statistical confidence level (0.95 = 95% CI)
Returns:
lower bound of Wilson CI, or 0.0 if no votes
"""
if total_votes == 0:
return 0.0
# z-score for the given confidence level (1.96 for 95% CI)
z = {0.90: 1.645, 0.95: 1.96, 0.99: 2.576}.get(confidence, 1.96)
p_hat = helpful_votes / total_votes # observed proportion
numerator = (
p_hat
+ (z ** 2) / (2 * total_votes)
- z * math.sqrt(
(p_hat * (1 - p_hat) + (z ** 2) / (4 * total_votes)) / total_votes
)
)
denominator = 1 + (z ** 2) / total_votes
return numerator / denominator
def compute_helpfulness_score(helpful: int, total: int, verified: bool, age_days: float) -> float:
"""
Final helpfulness score combining Wilson bound with recency and verified status.
Amazon also boosts verified purchases because they demonstrate actual product ownership.
"""
wilson = wilson_lower_bound(helpful, total)
# Recency decay: reviews older than 1 year decay toward 0.8x their score
# ln(2) / 365 ~ 0.0019 gives a half-life of roughly 1 year
recency_factor = math.exp(-0.0019 * max(age_days - 30, 0))
# Verified purchase boost: 1.15x multiplier
verified_boost = 1.15 if verified else 1.0
return wilson * recency_factor * verified_boost
This score is stored in the Elasticsearch document’s helpfulness_score field and in a Redis sorted set per ASIN:
# Maintain per-ASIN sorted set of top-N review IDs by helpfulness score
# Score is the float returned by compute_helpfulness_score()
ZADD helpful:{asin} {score} {review_id}
# Trim to top-1000 to bound memory usage per ASIN
ZREMRANGEBYRANK helpful:{asin} 0 -1001
# Read path: get top-50 review IDs for product page
ZREVRANGE helpful:{asin} 0 49 WITHSCORES
Reddit used the Wilson score lower bound as their comment ranking algorithm from 2009 to 2014 (described in a widely-cited post by Evan Miller). Yelp uses the same formula for surfacing review quality. The key insight Evan Miller articulated: “What you really want to know is: given the votes I’ve seen so far, what is my best guess as to the true quality of this item?” The Wilson bound is the rigorous statistical answer to that question.
Data Model
The data model reflects the dual-write pattern: every accepted review lands in both PostgreSQL (durable source of truth) and Elasticsearch (search/ranking index). Redis holds ephemeral caches that can be rebuilt from Postgres. S3 holds raw event archives for auditing and ML retraining.
-- Core reviews table - partitioned by asin_prefix for range-based sharding
-- PostgreSQL DDL
CREATE TABLE reviews (
review_id UUID NOT NULL DEFAULT gen_random_uuid(),
asin VARCHAR(10) NOT NULL,
user_id UUID NOT NULL,
rating SMALLINT NOT NULL CHECK (rating BETWEEN 1 AND 5),
title VARCHAR(256),
body TEXT NOT NULL,
body_length INT GENERATED ALWAYS AS (length(body)) STORED,
verified_purchase BOOLEAN NOT NULL DEFAULT false,
helpful_votes INT NOT NULL DEFAULT 0,
total_votes INT NOT NULL DEFAULT 0,
helpfulness_score FLOAT8 NOT NULL DEFAULT 0.0,
fingerprint CHAR(64) NOT NULL, -- SHA-256 hex for dedup
fraud_score FLOAT4, -- NULL means not yet scored
fraud_action VARCHAR(16), -- 'pass', 'flag', 'reject'
status VARCHAR(16) NOT NULL DEFAULT 'active', -- active/removed/pending
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (asin, review_id)
) PARTITION BY RANGE (asin);
-- Range partitions by ASIN first character (26 partitions + overflow)
CREATE TABLE reviews_a PARTITION OF reviews FOR VALUES FROM ('A') TO ('B');
CREATE TABLE reviews_b PARTITION OF reviews FOR VALUES FROM ('B') TO ('C');
-- ... one per letter through Z
CREATE TABLE reviews_z PARTITION OF reviews FOR VALUES FROM ('Z') TO ('[');
CREATE TABLE reviews_other PARTITION OF reviews DEFAULT;
-- Indexes per partition (inherited automatically in PG 11+)
CREATE INDEX idx_reviews_asin_created ON reviews (asin, created_at DESC)
WHERE status = 'active';
CREATE INDEX idx_reviews_asin_helpful ON reviews (asin, helpfulness_score DESC)
WHERE status = 'active';
CREATE INDEX idx_reviews_user_asin ON reviews (user_id, asin);
CREATE INDEX idx_reviews_fingerprint ON reviews (fingerprint);
-- BRIN index for time-range scans on large partitions
CREATE INDEX idx_reviews_created_brin ON reviews USING BRIN (created_at);
-- Product-level aggregate table (maintained by Rating Aggregator)
CREATE TABLE product_ratings (
asin VARCHAR(10) NOT NULL PRIMARY KEY,
rating_sum NUMERIC(14,2) NOT NULL DEFAULT 0,
rating_count BIGINT NOT NULL DEFAULT 0,
verified_sum NUMERIC(14,2) NOT NULL DEFAULT 0,
verified_count BIGINT NOT NULL DEFAULT 0,
display_avg NUMERIC(3,2) GENERATED ALWAYS AS (
CASE WHEN rating_count = 0 THEN NULL
ELSE ROUND((10.0 * 3.0 + rating_sum) / (10 + rating_count), 2)
END
) STORED,
star_dist JSONB NOT NULL DEFAULT '{"1":0,"2":0,"3":0,"4":0,"5":0}',
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Fraud signals table - append-only audit log
CREATE TABLE fraud_signals (
id BIGSERIAL PRIMARY KEY,
review_id UUID NOT NULL,
user_id UUID NOT NULL,
asin VARCHAR(10) NOT NULL,
fraud_score FLOAT4 NOT NULL,
features JSONB NOT NULL, -- full feature vector for debugging
action VARCHAR(16) NOT NULL, -- pass/flag/reject
model_version VARCHAR(32) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_fraud_signals_user ON fraud_signals (user_id, created_at DESC);
CREATE INDEX idx_fraud_signals_asin ON fraud_signals (asin, created_at DESC);
-- Dedup log for analytics (Redis is primary, this is for long-term analysis)
CREATE TABLE dedup_events (
fingerprint CHAR(64) NOT NULL,
user_id UUID NOT NULL,
asin VARCHAR(10) NOT NULL,
reason VARCHAR(32) NOT NULL, -- exact_duplicate/rate_limited
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
) PARTITION BY RANGE (created_at);
The review indexing strategy for Elasticsearch uses a separate document per review with denormalized product data to avoid joins at query time:
{
"mappings": {
"properties": {
"review_id": { "type": "keyword" },
"asin": { "type": "keyword" },
"user_id": { "type": "keyword" },
"rating": { "type": "byte" },
"title": { "type": "text", "analyzer": "english" },
"body": { "type": "text", "analyzer": "english" },
"verified_purchase": { "type": "boolean" },
"helpful_votes": { "type": "integer" },
"total_votes": { "type": "integer" },
"helpfulness_score": { "type": "float" },
"fraud_action": { "type": "keyword" },
"status": { "type": "keyword" },
"created_at": { "type": "date" }
}
},
"settings": {
"number_of_shards": 6,
"number_of_replicas": 2,
"refresh_interval": "5s",
"analysis": {
"analyzer": {
"review_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "stop", "porter_stem"]
}
}
}
}
}
The refresh_interval of 5s means a new review will be searchable within 5 seconds of being indexed - matching our SLA for aggregate visibility. For high-frequency helpfulness score updates (every helpful vote), we batch updates via the Bulk API rather than issuing individual _update calls.
The star distribution in the product_ratings table is stored as JSONB (a dictionary mapping star value to count) rather than five separate integer columns. This allows the histogram display on product pages to be read from a single row without any GROUP BY aggregation on the reviews table. The JSONB field is updated atomically via: UPDATE product_ratings SET star_dist = jsonb_set(star_dist, ARRAY[$rating::text], ((star_dist->>$rating::text)::int + 1)::text::jsonb) WHERE asin = $asin.
Key Algorithms and Protocols
Incremental Weighted Average Without Race Conditions
The most critical algorithm in the system is the lock-free incremental average update. The challenge is that two Kafka consumers might try to increment the same ASIN’s aggregate concurrently. In Postgres, the UPDATE ... SET sum = sum + $1 is handled atomically by row-level locking - only one writer can hold the lock at a time. In Redis, HINCRBYFLOAT is also atomic as a single-command operation. But we need both to succeed or both to fail.
We achieve this by treating Redis as the write-ahead cache and Postgres as the durable store, with an idempotency mechanism in case one fails:
# Atomic dual-write for rating aggregation
# Ensures Redis and Postgres stay consistent even under failures
import uuid
import redis
import psycopg2
def aggregate_rating_update(
r: redis.Redis,
conn: psycopg2.extensions.connection,
asin: str,
rating: int,
is_verified: bool,
review_id: str,
) -> None:
"""
Updates both Redis and Postgres atomically.
Uses a Redis idempotency key to prevent double-counting on retries.
"""
idempotency_key = f"agg_done:{review_id}"
# Check if we've already processed this review (retry safety)
if r.get(idempotency_key):
return
# Step 1: Update Redis (fast path, will be used by read path immediately)
pipe = r.pipeline(transaction=True)
pipe.hincrbyfloat(f"rating:{asin}", "sum", float(rating))
pipe.hincrby(f"rating:{asin}", "count", 1)
if is_verified:
pipe.hincrbyfloat(f"rating:{asin}", "verified_sum", float(rating))
pipe.hincrby(f"rating:{asin}", "verified_count", 1)
# Update star distribution
pipe.hincrby(f"rating:{asin}", f"star_{rating}", 1)
# Mark as done (TTL = 7 days to handle delayed retries)
pipe.set(idempotency_key, "1", ex=604800)
pipe.execute()
# Step 2: Update Postgres (durable path)
with conn.cursor() as cur:
cur.execute("""
INSERT INTO product_ratings (asin, rating_sum, rating_count, verified_sum, verified_count)
VALUES (%s, %s, 1, %s, %s)
ON CONFLICT (asin) DO UPDATE
SET
rating_sum = product_ratings.rating_sum + EXCLUDED.rating_sum,
rating_count = product_ratings.rating_count + 1,
verified_sum = product_ratings.verified_sum + EXCLUDED.verified_sum,
verified_count = product_ratings.verified_count + EXCLUDED.verified_count,
updated_at = now()
""", (
asin,
float(rating),
float(rating) if is_verified else 0.0,
1 if is_verified else 0,
))
conn.commit()
Abuse Detection via Velocity Windows
Abuse detection uses a sliding window counter in Redis to detect unusual patterns. Unlike a fixed window (which resets abruptly and can be gamed by timing), a sliding window counts events in a rolling time range:
# Sliding window rate tracking for abuse detection signals
import time
import redis
def record_review_velocity(r: redis.Redis, asin: str, user_id: str) -> None:
"""
Updates velocity tracking for both the product and the user.
Uses a Redis sorted set with timestamp as score for sliding window.
"""
now = int(time.time() * 1000) # millisecond precision
pipe = r.pipeline(transaction=False)
# Product velocity: track all reviewers on this ASIN in a 1-hour window
prod_vel_key = f"prod_velocity:{asin}"
pipe.zadd(prod_vel_key, {f"{user_id}:{now}": now})
pipe.zremrangebyscore(prod_vel_key, 0, now - 3_600_000) # remove > 1h ago
pipe.expire(prod_vel_key, 7200)
# User velocity: track all ASINs this user reviewed in last 24 hours
user_vel_key = f"user_velocity:{user_id}"
pipe.zadd(user_vel_key, {f"{asin}:{now}": now})
pipe.zremrangebyscore(user_vel_key, 0, now - 86_400_000) # remove > 24h ago
pipe.expire(user_vel_key, 172800)
pipe.execute()
def get_velocity_signals(r: redis.Redis, asin: str, user_id: str) -> dict:
"""
Reads current velocity signals for fraud feature extraction.
All reads are O(log N) ZCOUNT operations.
"""
now = int(time.time() * 1000)
pipe = r.pipeline(transaction=False)
pipe.zcount(f"prod_velocity:{asin}", now - 3_600_000, now)
pipe.zcount(f"user_velocity:{user_id}", now - 86_400_000, now)
pipe.zcount(f"user_velocity:{user_id}", now - 604_800_000, now)
results = pipe.execute()
return {
"product_reviews_1h": results[0],
"user_reviews_24h": results[1],
"user_reviews_7d": results[2],
}
Scaling and Performance
Capacity estimation - building the back-of-envelope model that drives our sizing decisions:
Given:
- Peak write throughput: 500 reviews/second
- Average review size: 800 bytes (title + body + metadata)
- Read traffic: 50,000 requests/second for product pages
- Retention: 3 years active + 7 years archive
Storage (PostgreSQL, reviews table):
500 req/s * 86400 s/day * 365 days/year * 3 years = ~47 billion reviews
...wait, that is not right. Let us recalibrate.
Amazon has ~600 million total reviews (cumulative, all time).
Average growth: ~150 million new reviews/year = ~4.7/second average.
Peak is 500/s during events - about 100x the average.
3 years of growth at 150M/year = 450M new reviews.
450M reviews * 1200 bytes average = ~540 GB raw review data.
With indexes and overhead: ~1.5 TB.
Storage (Elasticsearch reviews index):
450M documents * 1500 bytes = ~675 GB.
3x replication = ~2 TB.
Redis working set:
rating:{asin}: ~50 bytes per ASIN, 400M ASINs = 20 GB (too large - cache top 10M active)
dedup:{fp}: 32 bytes per fingerprint, 90-day window, 150M reviews/year * 90/365 = ~37M keys
37M keys * 50 bytes overhead = ~1.85 GB.
helpful:{asin}: 1000 review IDs per ASIN, 10M active ASINs, 8 bytes per ID = 80 GB.
Total Redis working set: ~100 GB across 6-shard cluster.
Bandwidth (Kafka):
500 reviews/s * 1500 bytes (with metadata) = 750 KB/s write to Kafka.
3x replication = 2.25 MB/s. Trivial for Kafka.
Compute (Fraud scoring):
500 reviews/s, each taking 50ms to score = 25 concurrent scorer threads needed.
With 8-core instances running 4 threads each: 1 instance handles ~32 concurrent.
Deploy 3 instances for safety margin.
API Gateway:
Write path: 500/s * 250ms avg latency = 125 concurrent requests. 2 medium instances.
Read path: 50,000/s * 5ms avg latency = 250 concurrent requests. 4 medium instances.
CDN absorbs ~80% of read traffic for popular ASINs, real origin load: ~10,000/s.
The dominant bottleneck at scale is the Redis cluster handling the helpful:{asin} sorted sets for the top 10 million active products. A product with 100,000 reviews that receives 100 helpful votes per second generates 100 ZADD operations per second against a single Redis key. Redis is single-threaded per slot, so a hot ASIN can saturate a single Redis shard. The mitigation is to shard the sorted set across multiple keys using consistent hashing: helpful:{asin}:{shard_id} where shard_id = hash(review_id) % 16. Reads must MERGE results from all 16 shards, but writes are distributed.
For Kafka partitioning, we use asin as the partition key (after hashing). This ensures all reviews for a given product flow through the same consumer instance, which simplifies the product-level velocity tracking - a consumer only needs to check its local state rather than coordinate with other consumers.
Amazon’s actual Elastic MapReduce pipelines run overnight to recompute fraud scores for the entire review corpus when the ML model is updated. This “batch re-scoring” ensures that historically accepted reviews get re-evaluated with the latest fraud signals - a review farm that was clever enough to evade an older model may be caught by a retrained one. Stripe uses the same pattern for fraud: real-time scoring on new events, plus periodic batch re-evaluation of historical decisions as the model improves.
Failure Modes and Recovery
| Failure | Detection | Impact | Recovery |
|---|---|---|---|
| Redis cluster shard failure | Redis Sentinel alerts, SETNX timeouts | Dedup checks fail; reviews may double-accept | Failover to replica (auto within 30s); backfill dedup set from Postgres fingerprint column on recovery |
| Kafka broker outage | Consumer lag metric exceeds 10,000 msgs | Review submissions queue in API Gateway memory (bounded); no data loss if within 30s buffer | Kafka auto-elects new leader for affected partitions; consumer resumes from last committed offset |
| Fraud model service down | Health check misses; CPU drops to baseline | All reviews pass fraud gate automatically (fail-open) | Restart service; queue will replay unscored events; batch re-scoring job corrects false-pass decisions |
| PostgreSQL primary failure | Replication lag drops to 0; health check fails | Writes to product_ratings fail; Redis still serves reads | Promote replica (RDS Multi-AZ auto-promotes in ~60s); aggregate updates replay from Kafka backlog |
| Elasticsearch index corruption | Query error rate spikes; slow log entries | Search and sorted review display unavailable | Re-index from PostgreSQL (estimated 4 hours for 450M documents at 30,000 docs/s Bulk API) |
| Rating aggregate drift (Redis vs Postgres) | Hourly reconciliation job detects delta > 0.01 | Display rating slightly wrong | Reconciliation job overwrites Redis from Postgres for affected ASINs; root cause is typically a Redis key eviction under memory pressure |
The most common operational mistake with incremental aggregates is forgetting to handle soft-deletes. If a moderator removes a fraudulent 5-star review after the aggregator has already counted it, you must subtract its contribution from product_ratings.rating_sum AND rating_count. Systems that only append never subtract, so after 6 months of moderation activity the displayed average can drift by 0.2-0.5 stars for affected products. Always implement a removal path that mirrors the addition path exactly.
Comparison of Approaches
| Approach | Write Latency | Read Latency | Fraud Detection | Scalability | Best Fit |
|---|---|---|---|---|---|
| Synchronous SQL pipeline (all in one transaction) | 100-500ms | 10-50ms | Simple rules only | Low (DB bottleneck) | Internal tools with low volume |
| Async Kafka pipeline (this design) | 50-200ms (ACK before score) | 5-20ms (Redis cache) | Full ML scoring | High (partition scaling) | High-throughput consumer marketplace |
| Lambda-per-event (serverless) | 200-1000ms (cold starts) | 10-50ms | ML scoring viable | Medium (concurrency limits) | Spiky, low-baseline workloads |
| Batch aggregation (nightly MapReduce) | Immediate write, stale display | 5ms (precomputed) | Full ML + graph | Very high | Analytics-only, no real-time requirement |
| CQRS with event sourcing | 50-150ms | 5-15ms | Full ML scoring | High | Audit-heavy systems needing temporal queries |
The async Kafka pipeline is the right choice here because the latency contract allows it - customers do not expect their review to appear instantly (they accept “your review is being processed”), and the fraud detection quality available with async ML scoring is far superior to what is achievable synchronously. Lambda-per-event looks attractive initially but the 15-minute concurrency scale-out delay in AWS Lambda makes it unsuitable for Prime Day spikes where traffic can 10x in 30 seconds. The batch approach violates the 5-second visibility SLA entirely.
Key Takeaways
- Deduplication fingerprinting should operate on normalized content rather than raw text, and use Redis
SETNXfor O(1) dedup checks that do not require reading existing data. - Fraud signal features are most powerful when combined: account age alone is weak, but account age plus rating variance plus product velocity together create a feature space that is hard for farms to game simultaneously.
- Weighted average calculation should be maintained incrementally via
(sum, count)pairs rather than recomputed viaSELECT AVG(), reducing write cost from O(n) to O(1) per review. - Helpful vote ranking using the Wilson score lower bound produces a statistically defensible ordering that resists manipulation and naturally handles the cold-start problem for new reviews.
- Review pipeline stages should be independently scalable - the fraud detection consumers can scale to 32 instances without affecting the dedup service or the read path.
- Abuse detection requires sliding window counters (not fixed windows) to accurately capture burst behavior without gap exploitation at window boundaries.
- Review indexing in Elasticsearch should use a 5-second refresh interval to balance real-time visibility against the overhead of frequent segment merges; sub-second refresh would make the cluster unusable under write load.
- Soft-delete handling must mirror the add path exactly: any aggregated quantity that was incremented on review creation must be decremented on review removal, or your averages will drift over time.
The counter-intuitive lesson in this design is that the hardest problem is not fraud detection - it is keeping the aggregate correct under all possible failure and race conditions. The fraud ML model is sophisticated but bounded: it either accepts or rejects a review, and you can always batch re-evaluate. But a rating_sum that drifts by 0.3 stars for a top-selling product can shift purchase decisions for millions of customers and is extraordinarily difficult to detect, diagnose, and correct after the fact.
Frequently Asked Questions
Q: Why not use a single Redis sorted set per ASIN for helpful vote ranking instead of a separate Redis key plus Elasticsearch?
A: Redis sorted sets handle ranking beautifully for top-50 access patterns, and we do use them for the product page hot path. But Elasticsearch is essential for two use cases: full-text search within a product’s reviews (“show me reviews mentioning battery life”) and faceted filtering (star rating, verified purchase, date range). Redis has no text search capability. The sorted set in Redis is a cache of the Elasticsearch ordering - it avoids hitting Elasticsearch for every product page load while Elasticsearch handles the rich query use cases.
Q: Why not store the fraud score synchronously and reject inline rather than in a Kafka consumer?
A: Feature extraction requires multiple Redis lookups (account history, product velocity, historical signals) and an ML model inference call, all of which add 30-200ms. Adding this to the synchronous request path would push p99 write latency to 400-600ms, breaking the sub-250ms SLA. The tradeoff is that a fraudulent review is live for up to 500ms before being removed - acceptable for a review system (unlike, say, a financial transaction where milliseconds matter). The fail-open behavior during model outages is the bigger concern; it’s mitigated by the batch re-scoring job.
Q: Why not use Kafka Streams or Flink instead of a custom consumer for the aggregation?
A: Kafka Streams would work well for the fraud detection and rating aggregation stages - it has built-in windowed aggregation, exactly-once semantics, and changelog topics for state recovery. We avoided it here to keep the operational complexity manageable: a Kafka Streams application adds a new deployment topology (the streams topology) that must be co-located with the Kafka cluster. The custom consumer with Redis + Postgres achieves the same outcome with more obvious failure modes. At hyperscale (10,000 reviews/second), migrating to Flink’s stateful streaming would be the right call.
Q: Why not use PostgreSQL full-text search (tsvector) instead of Elasticsearch for review search?
A: PostgreSQL full-text search works well at lower scale but becomes a bottleneck when search traffic is 50,000 requests/second and the corpus is 450 million documents. The @@ tsvector operator requires an index scan that still touches millions of rows for common search terms. Elasticsearch’s inverted index with per-shard distribution handles this natively and allows horizontal scaling via adding shards. PostgreSQL’s tsvector index is a good solution up to tens of millions of rows and a few hundred queries/second - beyond that, the dedicated search engine wins.
Q: How do you handle the star distribution histogram update atomically?
A: We use PostgreSQL’s jsonb_set function to atomically update the star count inside the JSONB column in a single SQL statement without a read-modify-write cycle: UPDATE product_ratings SET star_dist = jsonb_set(star_dist, ARRAY[$1::text], to_jsonb((star_dist->>$1::text)::int + 1)) WHERE asin = $2. This is a single row write protected by row-level locking. The Redis equivalent maintains five separate HINCRBY fields (star_1 through star_5) that are assembled into the histogram by the read path.
Q: Why not use eventual consistency for the dedup check rather than synchronous Redis?
A: Eventual consistency for deduplication is the wrong tradeoff here. If we accept a duplicate while the dedup state propagates, we will show customers a duplicate review on the product page - a visible quality defect. The Redis SETNX check adds under 5ms to the write path and provides strong single-key consistency. The only case where this fails is if the Redis cluster is completely unavailable, at which point we fail-open (accept submissions without dedup checking) and rely on the downstream idempotent Postgres insert to catch duplicates. A 5-minute Redis outage means at most 1,500 duplicate reviews are accepted - a manageable moderation workload.
Interview Questions
Q: Walk me through how you would design the deduplication system to handle the case where the same customer submits a review from both a mobile app and a desktop browser simultaneously, with unreliable network causing each to retry three times.
Expected depth: Discuss idempotency keys, the race condition between concurrent SETNX operations across multiple API servers, how the Postgres ON CONFLICT DO NOTHING on the fingerprint column acts as the final dedup gate, and why optimistic dedup (accept first, deduplicate later) is preferable to pessimistic locking for latency-sensitive write paths. Mention that the worst-case outcome is two Kafka messages for the same review, which the consumer handles via the idempotency key pattern.
Q: How would you update the fraud model for all 450 million historical reviews when the ML team deploys a new version?
Expected depth: The candidate should describe a batch re-scoring job using Spark or Flink that reads from S3 (raw event archive), re-extracts features from the feature store (using historical snapshots), runs the new model, and writes decisions back to the fraud_signals table and Postgres reviews.fraud_score. Key challenges: the feature store may not have historical snapshots of account-level signals at review time, requiring temporal joins. Discuss the tradeoff between re-scoring all 450M (4-8 hour job) vs. only recent reviews (faster, misses historical fakes). A good answer mentions shadow scoring the new model before deploying it.
Q: The product team wants the review average on product pages to update within 1 second instead of 5 seconds. How do you achieve this without changing the database architecture?
Expected depth: The current 5-second lag comes from Elasticsearch’s refresh_interval = 5s plus Kafka consumer processing time. For the average star rating (not search), the fix is to update the Redis cache synchronously when the Kafka consumer processes the review (which takes under 500ms end-to-end). Product pages should read the average directly from Redis HGET rating:{asin} sum / HGET rating:{asin} count rather than from Elasticsearch. The candidate should distinguish between the display average (Redis, near-real-time) and search index freshness (Elasticsearch, 5s interval is fine for search).
Q: A seller approaches Amazon with evidence that a competitor has been flooding their product with 1-star reviews from fake accounts over the past 6 months. How would you retroactively identify and remove those reviews, and how would you re-compute the product’s corrected average?
Expected depth: Describe the investigation query: find all 1-star reviews on the ASIN in the time window, join with fraud_signals to identify those with fraud_score above a threshold, then look for coordinated account behavior (same IP ranges, same account creation batch dates) using the features JSONB column. Removal requires: soft-deleting the review records, subtracting their contributions from product_ratings in a single transaction, deleting their entries from Redis sorted sets, and triggering a bulk delete in Elasticsearch by review_id. Mention the audit trail - every deleted review should write a row to a moderation_actions table with the reason and moderator ID.
Q: How do you prevent the Redis sorted set for helpful vote ranking from becoming unbounded in size for a product that accumulates 10 million reviews over 10 years?
Expected depth: The ZREMRANGEBYRANK trim (keeping top-1000) is the first answer - product pages only show 50, so caching 1,000 gives buffer for filtered views. The candidate should go further: discuss why we need the full ranked list (some customers sort by “most critical” or filter by star rating), which means the sorted set cannot be arbitrarily truncated. The real solution is tiered storage: keep the top-10,000 in Redis sorted set, store the full ranking in DynamoDB keyed by (asin, sort_key), and fall back to DynamoDB for deeper pagination. This caps Redis memory while preserving full ranking for power users.
Premium Content
Unlock the full article along with everything else in the archive — all in one place.