Skip to content

Commit 09bfd6e

Browse files
committed
feat(redrive): DLQ redrive tooling — safe replay (ADR-0026)
Java mirror of babelqueue-go Redrive: Redrive.redrive(transport, dlq, opts) drains a DLQ and re-publishes each message (reset: strip dead_letter, attempts->0, preserve job/trace_id/data/meta) to its dead_letter.original_queue or opts.toQueue. Options all()/toQueue/max/dryRun/select. The codec-only core has no transport, so it works over a minimal Transport seam the caller implements (like the otel Sender). Drains-then-processes; acks only after a successful re-publish; restores undecodable bodies (a null-job envelope) and on publish failure. No new dependency; envelope frozen. Replay-Bypass header documented as phase two.
1 parent 8eddb59 commit 09bfd6e

2 files changed

Lines changed: 379 additions & 0 deletions

File tree

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package com.babelqueue;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.function.Predicate;
6+
7+
/**
8+
* DLQ redrive tooling — safe replay off the dead-letter queue (ADR-0026).
9+
*
10+
* <p>The operator-side counterpart to the runtime's dead-letter routing: it reads dead-lettered
11+
* messages off a DLQ and re-publishes each to its source queue (its {@code dead_letter
12+
* .original_queue}) or to a chosen queue, {@linkplain #reset(Envelope) reset for reprocessing}
13+
* — the {@code dead_letter} block removed and {@code attempts} reset to 0, while {@code job},
14+
* {@code trace_id}, {@code data} and {@code meta} are preserved verbatim.
15+
*
16+
* <p>The Java core is codec-only (transports are separate artifacts), so {@link #redrive} works
17+
* over a minimal {@link Transport} the caller implements over their broker — the same shape as
18+
* the {@code otel} module's {@code Sender}. The wire envelope stays frozen (GR-1) and no
19+
* dependency is added.
20+
*
21+
* <p>Safety in v1 is {@code dryRun} + sandbox routing ({@code toQueue}) + {@code select}. The
22+
* <b>Replay-Bypass</b> guard (a {@code bq-replay-bypass} transport header surfaced to handlers
23+
* so a replay can skip external side-effects) is a documented phase two — it touches the
24+
* runtime and every transport, like ADR-0025's {@code traceparent} follow-up.
25+
*/
26+
public final class Redrive {
27+
28+
private Redrive() {
29+
}
30+
31+
/** A reserved DLQ message: its raw body plus a transport-specific handle used to ack it. */
32+
public record Reserved(String body, Object handle) {
33+
}
34+
35+
/** The minimal transport surface {@link #redrive} needs, implemented over any broker. */
36+
public interface Transport {
37+
38+
/** Reserve the next message from {@code queue}, or {@code null} when it is empty. */
39+
Reserved pop(String queue) throws Exception;
40+
41+
/** Publish an already-encoded {@code body} to {@code queue}. */
42+
void publish(String queue, String body) throws Exception;
43+
44+
/** Acknowledge (remove) a previously reserved message. */
45+
void ack(Reserved message) throws Exception;
46+
}
47+
48+
/**
49+
* Options for a {@link #redrive} run; immutable, built with the fluent withers from
50+
* {@link #all()}.
51+
*
52+
* @param toQueue overrides the target queue (sandbox/redirect); when blank, each message
53+
* goes back to its own {@code dead_letter.original_queue}
54+
* @param max caps how many messages are pulled from the DLQ (0 = all available)
55+
* @param dryRun inspect and report the plan, restoring every message unchanged
56+
* @param select picks which messages to redrive (unselected are restored unchanged)
57+
*/
58+
public record Options(String toQueue, int max, boolean dryRun, Predicate<Envelope> select) {
59+
60+
/** Redrive every message back to its source queue. */
61+
public static Options all() {
62+
return new Options(null, 0, false, null);
63+
}
64+
65+
public Options toQueue(String queue) {
66+
return new Options(queue, max, dryRun, select);
67+
}
68+
69+
public Options max(int limit) {
70+
return new Options(toQueue, limit, dryRun, select);
71+
}
72+
73+
public Options dryRun(boolean enabled) {
74+
return new Options(toQueue, max, enabled, select);
75+
}
76+
77+
public Options select(Predicate<Envelope> predicate) {
78+
return new Options(toQueue, max, dryRun, predicate);
79+
}
80+
}
81+
82+
/** What happened to one message during a {@link #redrive} run. */
83+
public record Item(
84+
String messageId,
85+
String traceId,
86+
String urn,
87+
String reason,
88+
String from,
89+
String to,
90+
boolean redriven
91+
) {
92+
}
93+
94+
/** Summary of a {@link #redrive} run. */
95+
public record Result(int redriven, int skipped, List<Item> items) {
96+
97+
public Result {
98+
items = List.copyOf(items);
99+
}
100+
}
101+
102+
/**
103+
* Returns a copy of {@code env} reset for reprocessing: the {@code dead_letter} block is
104+
* removed and {@code attempts} reset to 0; {@code job}, {@code trace_id}, {@code data} and
105+
* {@code meta} are preserved verbatim.
106+
*/
107+
public static Envelope reset(Envelope env) {
108+
return new Envelope(env.job(), env.traceId(), env.data(), env.meta(), 0, null);
109+
}
110+
111+
/**
112+
* Drains dead-lettered messages off {@code dlq} and re-publishes each (reset) to its source
113+
* queue or {@code opts.toQueue}. Messages are drained first and then processed, so restored
114+
* messages (skipped, dry-run, or undecodable) are not re-encountered; a DLQ message is
115+
* acknowledged only after a successful re-publish, and an undecodable body is restored
116+
* rather than dropped.
117+
*
118+
* @return a {@link Result} with redriven/skipped counts and a per-message breakdown
119+
* @throws Exception if the transport fails; on a publish failure the message is restored to
120+
* the DLQ before the error is re-thrown
121+
*/
122+
public static Result redrive(Transport transport, String dlq, Options opts) throws Exception {
123+
List<Reserved> batch = new ArrayList<>();
124+
while (opts.max() == 0 || batch.size() < opts.max()) {
125+
Reserved msg = transport.pop(dlq);
126+
if (msg == null) {
127+
break;
128+
}
129+
batch.add(msg);
130+
}
131+
132+
int redriven = 0;
133+
int skipped = 0;
134+
List<Item> items = new ArrayList<>(batch.size());
135+
136+
for (Reserved msg : batch) {
137+
// decode() never throws — a malformed/non-object body yields an empty envelope
138+
// (a null job), which is not redrivable; restore it rather than drop it.
139+
Envelope env = EnvelopeCodec.decode(msg.body());
140+
if (env.job() == null || env.job().isBlank()) {
141+
transport.publish(dlq, msg.body());
142+
transport.ack(msg);
143+
skipped++;
144+
items.add(new Item(null, null, null, null, dlq, null, false));
145+
continue;
146+
}
147+
148+
String reason = env.deadLetter() == null ? null : env.deadLetter().reason();
149+
String messageId = env.meta() == null ? null : env.meta().id();
150+
151+
if (opts.select() != null && !opts.select().test(env)) {
152+
transport.publish(dlq, msg.body());
153+
transport.ack(msg);
154+
skipped++;
155+
items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, null, false));
156+
continue;
157+
}
158+
159+
String target = opts.toQueue() != null && !opts.toQueue().isBlank()
160+
? opts.toQueue()
161+
: sourceQueueOf(env);
162+
163+
if (opts.dryRun()) {
164+
transport.publish(dlq, msg.body());
165+
transport.ack(msg);
166+
skipped++;
167+
items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, false));
168+
continue;
169+
}
170+
171+
try {
172+
transport.publish(target, EnvelopeCodec.encode(reset(env)));
173+
} catch (Exception publishFailure) {
174+
transport.publish(dlq, msg.body());
175+
transport.ack(msg);
176+
throw publishFailure;
177+
}
178+
transport.ack(msg);
179+
redriven++;
180+
items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, true));
181+
}
182+
183+
return new Result(redriven, skipped, items);
184+
}
185+
186+
private static String sourceQueueOf(Envelope env) {
187+
DeadLetter dl = env.deadLetter();
188+
if (dl != null && dl.originalQueue() != null && !dl.originalQueue().isBlank()) {
189+
return dl.originalQueue();
190+
}
191+
return env.meta() == null ? null : env.meta().queue();
192+
}
193+
}
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package com.babelqueue;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertFalse;
5+
import static org.junit.jupiter.api.Assertions.assertNotNull;
6+
import static org.junit.jupiter.api.Assertions.assertNull;
7+
import static org.junit.jupiter.api.Assertions.assertThrows;
8+
9+
import java.util.ArrayDeque;
10+
import java.util.ArrayList;
11+
import java.util.Deque;
12+
import java.util.HashMap;
13+
import java.util.List;
14+
import java.util.Map;
15+
import org.junit.jupiter.api.Test;
16+
17+
class RedriveTest {
18+
19+
/** In-memory transport for tests; optionally refuses to publish to one queue. */
20+
private static final class MemoryTransport implements Redrive.Transport {
21+
private final Map<String, Deque<String>> queues = new HashMap<>();
22+
private final String failQueue;
23+
24+
MemoryTransport() {
25+
this(null);
26+
}
27+
28+
MemoryTransport(String failQueue) {
29+
this.failQueue = failQueue;
30+
}
31+
32+
@Override
33+
public Redrive.Reserved pop(String queue) {
34+
Deque<String> dq = queues.get(queue);
35+
if (dq == null || dq.isEmpty()) {
36+
return null;
37+
}
38+
return new Redrive.Reserved(dq.pollFirst(), null);
39+
}
40+
41+
@Override
42+
public void publish(String queue, String body) {
43+
if (queue.equals(failQueue)) {
44+
throw new IllegalStateException("publish refused");
45+
}
46+
queues.computeIfAbsent(queue, k -> new ArrayDeque<>()).addLast(body);
47+
}
48+
49+
@Override
50+
public void ack(Redrive.Reserved message) {
51+
// pop already removed it
52+
}
53+
54+
int size(String queue) {
55+
Deque<String> dq = queues.get(queue);
56+
return dq == null ? 0 : dq.size();
57+
}
58+
}
59+
60+
private static Envelope deadLettered(MemoryTransport t, String dlq, String urn, String originalQueue) {
61+
Envelope env = EnvelopeCodec.make(urn, Map.of("order_id", 1), originalQueue, null);
62+
Envelope dl = DeadLetters.annotate(env, "failed", originalQueue);
63+
t.publish(dlq, EnvelopeCodec.encode(dl));
64+
return dl;
65+
}
66+
67+
private static List<Envelope> drain(MemoryTransport t, String queue) {
68+
List<Envelope> out = new ArrayList<>();
69+
Redrive.Reserved m;
70+
while ((m = t.pop(queue)) != null) {
71+
out.add(EnvelopeCodec.decode(m.body()));
72+
}
73+
return out;
74+
}
75+
76+
@Test
77+
void redrivesToSourceAndResets() throws Exception {
78+
MemoryTransport t = new MemoryTransport();
79+
Envelope orig = deadLettered(t, "orders.dlq", "urn:babel:orders:created", "orders");
80+
81+
Redrive.Result res = Redrive.redrive(t, "orders.dlq", Redrive.Options.all());
82+
83+
assertEquals(1, res.redriven());
84+
assertEquals(0, res.skipped());
85+
List<Envelope> got = drain(t, "orders");
86+
assertEquals(1, got.size());
87+
assertNull(got.get(0).deadLetter(), "dead_letter must be stripped");
88+
assertEquals(0, got.get(0).attempts(), "attempts must reset");
89+
assertEquals(orig.traceId(), got.get(0).traceId(), "trace_id must be preserved");
90+
assertEquals("urn:babel:orders:created", got.get(0).job());
91+
assertEquals(0, t.size("orders.dlq"));
92+
}
93+
94+
@Test
95+
void redrivesToSandbox() throws Exception {
96+
MemoryTransport t = new MemoryTransport();
97+
deadLettered(t, "orders.dlq", "urn:babel:orders:created", "orders");
98+
99+
Redrive.Result res = Redrive.redrive(t, "orders.dlq", Redrive.Options.all().toQueue("sandbox"));
100+
101+
assertEquals(1, res.redriven());
102+
assertEquals(0, t.size("orders"));
103+
assertEquals(1, t.size("sandbox"));
104+
}
105+
106+
@Test
107+
void dryRunReportsPlanAndChangesNothing() throws Exception {
108+
MemoryTransport t = new MemoryTransport();
109+
deadLettered(t, "orders.dlq", "urn:babel:orders:created", "orders");
110+
111+
Redrive.Result res = Redrive.redrive(t, "orders.dlq", Redrive.Options.all().dryRun(true));
112+
113+
assertEquals(0, res.redriven());
114+
assertEquals(1, res.skipped());
115+
assertEquals("orders", res.items().get(0).to());
116+
assertFalse(res.items().get(0).redriven());
117+
assertEquals(0, t.size("orders"));
118+
assertEquals(1, t.size("orders.dlq"));
119+
assertNotNull(drain(t, "orders.dlq").get(0).deadLetter(), "DLQ message left unchanged");
120+
}
121+
122+
@Test
123+
void selectRedrivesOnlyMatching() throws Exception {
124+
MemoryTransport t = new MemoryTransport();
125+
deadLettered(t, "dlq", "urn:babel:orders:created", "orders");
126+
deadLettered(t, "dlq", "urn:babel:emails:welcome", "emails");
127+
128+
Redrive.Result res = Redrive.redrive(t, "dlq",
129+
Redrive.Options.all().select(e -> "urn:babel:orders:created".equals(e.job())));
130+
131+
assertEquals(1, res.redriven());
132+
assertEquals(1, res.skipped());
133+
assertEquals(1, t.size("orders"));
134+
assertEquals(0, t.size("emails"));
135+
assertEquals(1, t.size("dlq"), "unselected message is restored to the DLQ");
136+
}
137+
138+
@Test
139+
void maxCapsHowManyArePulled() throws Exception {
140+
MemoryTransport t = new MemoryTransport();
141+
for (int i = 0; i < 3; i++) {
142+
deadLettered(t, "dlq", "urn:babel:orders:created", "orders");
143+
}
144+
145+
Redrive.Result res = Redrive.redrive(t, "dlq", Redrive.Options.all().max(2));
146+
147+
assertEquals(2, res.redriven());
148+
assertEquals(1, t.size("dlq"));
149+
}
150+
151+
@Test
152+
void publishFailureRestoresToDlq() {
153+
MemoryTransport t = new MemoryTransport("orders");
154+
deadLettered(t, "dlq", "urn:babel:orders:created", "orders");
155+
156+
assertThrows(IllegalStateException.class,
157+
() -> Redrive.redrive(t, "dlq", Redrive.Options.all()));
158+
159+
assertEquals(1, t.size("dlq"), "a message must be restored when its re-publish fails");
160+
assertEquals(0, t.size("orders"));
161+
}
162+
163+
@Test
164+
void undecodableBodyIsRestored() throws Exception {
165+
MemoryTransport t = new MemoryTransport();
166+
t.publish("dlq", "not-json{{{");
167+
168+
Redrive.Result res = Redrive.redrive(t, "dlq", Redrive.Options.all());
169+
170+
assertEquals(0, res.redriven());
171+
assertEquals(1, res.skipped());
172+
assertEquals("not-json{{{", t.pop("dlq").body(), "undecodable body must be restored");
173+
}
174+
175+
@Test
176+
void noDeadLetterFallsBackToMetaQueue() throws Exception {
177+
MemoryTransport t = new MemoryTransport();
178+
Envelope env = EnvelopeCodec.make("urn:babel:orders:created", Map.of(), "orders", null);
179+
t.publish("dlq", EnvelopeCodec.encode(env));
180+
181+
Redrive.Result res = Redrive.redrive(t, "dlq", Redrive.Options.all());
182+
183+
assertEquals(1, res.redriven());
184+
assertEquals(1, t.size("orders"));
185+
}
186+
}

0 commit comments

Comments
 (0)