Replaying Events Safely From a Dead-Letter Queue

A dead-letter queue is only half a recovery story. Capturing failed webhooks is straightforward; the dangerous part is putting them back into flight. Replay touches a live system, so a careless drain can double-charge customers, deliver events out of order, or knock over the very endpoint that just recovered. This guide, part of Dead-Letter Queue Architecture, covers a replay procedure that preserves ordering, stays idempotent, throttles itself, and isolates poison-pill messages. It assumes you already have a DLQ in place; if not, start with building a dead-letter queue for failed webhooks and return here for the drain.

The core principle: replay is a controlled, observable, repeatable operation, not a one-click “retry all” button. Treat each replay run like a deployment — snapshot the input, throttle the rollout, and have a defined failure path.

DLQ replay pipeline Messages flow from the DLQ through a snapshot, an idempotency gate and a rate limiter to the consumer, while repeated failures divert to a quarantine queue. DLQ failed events Snapshot ordered batch Idempotency + rate limit Consumer Quarantine queue retry cap hit
The replay path: snapshot the batch, gate each message on idempotency and rate, deliver to the consumer, and divert repeat failures to quarantine.

Prerequisites

Step 1: Snapshot the batch before draining

Never replay directly off the live DLQ. Copy the messages you intend to replay into an immutable snapshot (a table row, an S3 object, a Redis list with a run ID). This makes a replay run repeatable, auditable, and safe to abort midway:

import json, time, uuid

def snapshot_replay_batch(dlq, store, limit=1000) -> str:
    run_id = f"replay-{uuid.uuid4().hex[:12]}"
    messages = dlq.peek(limit=limit)          # non-destructive read
    store.put(run_id, {
        "created_at": time.time(),
        "count": len(messages),
        "status": "pending",
        "messages": [json.loads(m.body) for m in messages],
    })
    return run_id

If a replay goes wrong, you have the exact input to investigate, and you can re-run the same run_id rather than re-scraping a DLQ whose contents have shifted.

Step 2: Restore ordering by partition key

Webhooks for the same entity must be replayed in their original order, or a stale updated event can clobber a newer created. Group the snapshot by ordering key, sort each group by original sequence, and replay groups independently (different entities can go in parallel; messages within one entity must be serial). This mirrors the discipline in implementing strict ordering for financial webhooks:

from collections import defaultdict

def order_batch(messages: list[dict]) -> dict[str, list[dict]]:
    groups: dict[str, list[dict]] = defaultdict(list)
    for m in messages:
        groups[m["ordering_key"]].append(m)
    for key in groups:
        groups[key].sort(key=lambda m: m["sequence"])  # original order
    return groups

Replay each group’s messages strictly in sequence; do not advance to the next message in a group until the previous one succeeds or is quarantined.

Step 3: Enforce idempotency on every replayed message

A replayed event will often have already been partially processed before it failed. The consumer’s idempotency check is the safety net, but the replayer should also short-circuit messages whose idempotency key is already recorded as applied, to avoid wasted round trips:

import redis

class IdempotencyGate:
    def __init__(self, client: redis.Redis, ttl: int = 7 * 24 * 3600):
        self.r = client
        self.ttl = ttl

    def claim(self, idem_key: str) -> bool:
        # True if this is the first time we deliver this key during replay.
        return bool(self.r.set(f"replay:idem:{idem_key}", "1", nx=True, ex=self.ttl))

Always send the idempotency key in the replayed request header so the consumer makes the final, authoritative dedup decision. The replayer’s gate is an optimization; the consumer’s check is the guarantee.

Step 4: Rate-limit the drain

A DLQ can hold hours of accumulated traffic. Replaying it at full speed is a self-inflicted load test against an endpoint that just came back. Throttle with a token bucket so the drain rate stays well under the endpoint’s healthy capacity, and coordinate with backpressure on webhook consumers:

import time

class TokenBucket:
    def __init__(self, rate_per_sec: float, capacity: float):
        self.rate = rate_per_sec
        self.capacity = capacity
        self.tokens = capacity
        self.updated = time.monotonic()

    def take(self) -> None:
        while True:
            now = time.monotonic()
            self.tokens = min(self.capacity, self.tokens + (now - self.updated) * self.rate)
            self.updated = now
            if self.tokens >= 1:
                self.tokens -= 1
                return
            time.sleep((1 - self.tokens) / self.rate)

Call bucket.take() before each delivery. Start conservative (e.g. 10–20% of normal throughput) and ramp only after the endpoint’s error rate stays flat.

Step 5: Quarantine poison pills

Some messages will never succeed — malformed payloads, references to deleted resources, schema the consumer can no longer parse. Without a cap they loop forever and block their ordering group. Bound replay attempts and divert exhausted messages to a separate quarantine queue for human review, never back to the DLQ:

def replay_message(msg, deliver, gate, bucket, quarantine, max_attempts=3) -> str:
    if not gate.claim(msg["idempotency_key"]):
        return "skipped_duplicate"
    attempts = msg.get("replay_attempts", 0)
    if attempts >= max_attempts:
        quarantine.put(msg, reason="max_replay_attempts")
        return "quarantined"
    bucket.take()
    try:
        deliver(msg)
        return "delivered"
    except Exception as exc:
        msg["replay_attempts"] = attempts + 1
        msg["last_error"] = str(exc)
        quarantine.put(msg, reason="replay_failed") if msg["replay_attempts"] >= max_attempts \
            else None
        return "failed"

Verification

Unit-test the dangerous invariants: ordering preserved, duplicates suppressed, poison pills quarantined.

def test_replay_dedups_and_orders():
    delivered = []
    gate = IdempotencyGate(fakeredis.FakeStrictRedis())
    bucket = TokenBucket(rate_per_sec=1000, capacity=1000)
    q = InMemoryQueue()

    msgs = [
        {"ordering_key": "acct-1", "sequence": 2, "idempotency_key": "b"},
        {"ordering_key": "acct-1", "sequence": 1, "idempotency_key": "a"},
        {"ordering_key": "acct-1", "sequence": 1, "idempotency_key": "a"},  # dup
    ]
    for group in order_batch(msgs).values():
        for m in group:
            r = replay_message(m, lambda x: delivered.append(x["idempotency_key"]),
                               gate, bucket, q)
    assert delivered == ["a", "b"]   # ordered, duplicate suppressed

For a live drain, watch the depth and error rate together:

# SQS: confirm DLQ depth falls and the target queue is not flooded.
aws sqs get-queue-attributes --queue-url "$DLQ_URL" \
  --attribute-names ApproximateNumberOfMessages

# Tail consumer logs for duplicate-idempotency-key hits (should trend to zero).
kubectl logs -l app=webhook-consumer --since=5m | grep 'duplicate_idempotency_key' | wc -l

A healthy replay shows DLQ depth decreasing monotonically, the quarantine queue growing only slowly, and no rise in downstream 5xx rates.

Failure modes and gotchas