Replaying Events Safely From a Dead-Letter Queue
A dead-letter queue is only half a recovery story. Capturing failed webhooks is straightforward; the dangerous part is putting them back into flight. Replay touches a live system, so a careless drain can double-charge customers, deliver events out of order, or knock over the very endpoint that just recovered. This guide, part of Dead-Letter Queue Architecture, covers a replay procedure that preserves ordering, stays idempotent, throttles itself, and isolates poison-pill messages. It assumes you already have a DLQ in place; if not, start with building a dead-letter queue for failed webhooks and return here for the drain.
The core principle: replay is a controlled, observable, repeatable operation, not a one-click “retry all” button. Treat each replay run like a deployment — snapshot the input, throttle the rollout, and have a defined failure path.
Prerequisites
- A DLQ you can read from non-destructively (peek/copy before delete). On SQS use a dedicated redrive consumer; on Redis Streams use a consumer group with explicit
XACK. - An idempotency key on every message — ideally the same key the original delivery carried. Replay relies on the consumer’s dedup store; see how to design idempotent webhook consumers.
- An ordering key (e.g. account ID, aggregate ID) and a recorded original sequence/timestamp per message.
- A retry/attempt counter persisted with each DLQ message so you can detect poison pills.
- Confirmation the downstream cause is fixed. Replaying into a still-broken endpoint just refills the DLQ.
Step 1: Snapshot the batch before draining
Never replay directly off the live DLQ. Copy the messages you intend to replay into an immutable snapshot (a table row, an S3 object, a Redis list with a run ID). This makes a replay run repeatable, auditable, and safe to abort midway:
import json, time, uuid
def snapshot_replay_batch(dlq, store, limit=1000) -> str:
run_id = f"replay-{uuid.uuid4().hex[:12]}"
messages = dlq.peek(limit=limit) # non-destructive read
store.put(run_id, {
"created_at": time.time(),
"count": len(messages),
"status": "pending",
"messages": [json.loads(m.body) for m in messages],
})
return run_id
If a replay goes wrong, you have the exact input to investigate, and you can re-run the same run_id rather than re-scraping a DLQ whose contents have shifted.
Step 2: Restore ordering by partition key
Webhooks for the same entity must be replayed in their original order, or a stale updated event can clobber a newer created. Group the snapshot by ordering key, sort each group by original sequence, and replay groups independently (different entities can go in parallel; messages within one entity must be serial). This mirrors the discipline in implementing strict ordering for financial webhooks:
from collections import defaultdict
def order_batch(messages: list[dict]) -> dict[str, list[dict]]:
groups: dict[str, list[dict]] = defaultdict(list)
for m in messages:
groups[m["ordering_key"]].append(m)
for key in groups:
groups[key].sort(key=lambda m: m["sequence"]) # original order
return groups
Replay each group’s messages strictly in sequence; do not advance to the next message in a group until the previous one succeeds or is quarantined.
Step 3: Enforce idempotency on every replayed message
A replayed event will often have already been partially processed before it failed. The consumer’s idempotency check is the safety net, but the replayer should also short-circuit messages whose idempotency key is already recorded as applied, to avoid wasted round trips:
import redis
class IdempotencyGate:
def __init__(self, client: redis.Redis, ttl: int = 7 * 24 * 3600):
self.r = client
self.ttl = ttl
def claim(self, idem_key: str) -> bool:
# True if this is the first time we deliver this key during replay.
return bool(self.r.set(f"replay:idem:{idem_key}", "1", nx=True, ex=self.ttl))
Always send the idempotency key in the replayed request header so the consumer makes the final, authoritative dedup decision. The replayer’s gate is an optimization; the consumer’s check is the guarantee.
Step 4: Rate-limit the drain
A DLQ can hold hours of accumulated traffic. Replaying it at full speed is a self-inflicted load test against an endpoint that just came back. Throttle with a token bucket so the drain rate stays well under the endpoint’s healthy capacity, and coordinate with backpressure on webhook consumers:
import time
class TokenBucket:
def __init__(self, rate_per_sec: float, capacity: float):
self.rate = rate_per_sec
self.capacity = capacity
self.tokens = capacity
self.updated = time.monotonic()
def take(self) -> None:
while True:
now = time.monotonic()
self.tokens = min(self.capacity, self.tokens + (now - self.updated) * self.rate)
self.updated = now
if self.tokens >= 1:
self.tokens -= 1
return
time.sleep((1 - self.tokens) / self.rate)
Call bucket.take() before each delivery. Start conservative (e.g. 10–20% of normal throughput) and ramp only after the endpoint’s error rate stays flat.
Step 5: Quarantine poison pills
Some messages will never succeed — malformed payloads, references to deleted resources, schema the consumer can no longer parse. Without a cap they loop forever and block their ordering group. Bound replay attempts and divert exhausted messages to a separate quarantine queue for human review, never back to the DLQ:
def replay_message(msg, deliver, gate, bucket, quarantine, max_attempts=3) -> str:
if not gate.claim(msg["idempotency_key"]):
return "skipped_duplicate"
attempts = msg.get("replay_attempts", 0)
if attempts >= max_attempts:
quarantine.put(msg, reason="max_replay_attempts")
return "quarantined"
bucket.take()
try:
deliver(msg)
return "delivered"
except Exception as exc:
msg["replay_attempts"] = attempts + 1
msg["last_error"] = str(exc)
quarantine.put(msg, reason="replay_failed") if msg["replay_attempts"] >= max_attempts \
else None
return "failed"
Verification
Unit-test the dangerous invariants: ordering preserved, duplicates suppressed, poison pills quarantined.
def test_replay_dedups_and_orders():
delivered = []
gate = IdempotencyGate(fakeredis.FakeStrictRedis())
bucket = TokenBucket(rate_per_sec=1000, capacity=1000)
q = InMemoryQueue()
msgs = [
{"ordering_key": "acct-1", "sequence": 2, "idempotency_key": "b"},
{"ordering_key": "acct-1", "sequence": 1, "idempotency_key": "a"},
{"ordering_key": "acct-1", "sequence": 1, "idempotency_key": "a"}, # dup
]
for group in order_batch(msgs).values():
for m in group:
r = replay_message(m, lambda x: delivered.append(x["idempotency_key"]),
gate, bucket, q)
assert delivered == ["a", "b"] # ordered, duplicate suppressed
For a live drain, watch the depth and error rate together:
# SQS: confirm DLQ depth falls and the target queue is not flooded.
aws sqs get-queue-attributes --queue-url "$DLQ_URL" \
--attribute-names ApproximateNumberOfMessages
# Tail consumer logs for duplicate-idempotency-key hits (should trend to zero).
kubectl logs -l app=webhook-consumer --since=5m | grep 'duplicate_idempotency_key' | wc -l
A healthy replay shows DLQ depth decreasing monotonically, the quarantine queue growing only slowly, and no rise in downstream 5xx rates.
Failure modes and gotchas
- Replaying into a still-broken endpoint. The most common mistake: draining before the root cause is fixed re-fills the DLQ and burns retry budget. Gate replay on a green health check, and abort the run if the live error rate climbs.
- Lost ordering from naive parallelism. Replaying the whole snapshot with a thread pool reorders same-entity events. Parallelize across ordering keys, never within one. A worker pool keyed by
ordering_keyis the safe pattern. - Idempotency window expired. If the consumer’s dedup TTL is shorter than the time the message sat in the DLQ, the key has aged out and replay double-applies. Set the dedup TTL longer than your maximum DLQ retention, or persist applied keys durably.
- Poison pills blocking a group. A message that always throws will stall every later event for that entity if you retry it indefinitely. The attempt cap plus quarantine in Step 5 is what keeps one bad message from freezing an account’s stream.
- Non-repeatable drains. Reading and deleting in one step means a crash mid-run loses messages with no record. Snapshot first (Step 1), delete from the DLQ only after a message reaches a terminal state (delivered or quarantined).