-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRedrive.java
More file actions
230 lines (198 loc) · 9.25 KB
/
Copy pathRedrive.java
File metadata and controls
230 lines (198 loc) · 9.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package com.babelqueue;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
/**
* DLQ redrive tooling — safe replay off the dead-letter queue (ADR-0026).
*
* <p>The operator-side counterpart to the runtime's dead-letter routing: it reads dead-lettered
* messages off a DLQ and re-publishes each to its source queue (its {@code dead_letter
* .original_queue}) or to a chosen queue, {@linkplain #reset(Envelope) reset for reprocessing}
* — the {@code dead_letter} block removed and {@code attempts} reset to 0, while {@code job},
* {@code trace_id}, {@code data} and {@code meta} are preserved verbatim.
*
* <p>The Java core is codec-only (transports are separate artifacts), so {@link #redrive} works
* over a minimal {@link Transport} the caller implements over their broker — the same shape as
* the {@code otel} module's {@code Sender}. The wire envelope stays frozen (GR-1) and no
* dependency is added.
*
* <p>Safety is {@code dryRun} + sandbox routing ({@code toQueue}) + {@code select}. The
* {@code bypass} option stamps a {@code bq-replay-bypass} transport header (see {@link Replay})
* so a handler can skip external side-effects on a replay; it takes effect on transports that
* implement {@link HeaderPublisher} (ADR-0027).
*/
public final class Redrive {
private Redrive() {
}
/**
* A reserved DLQ message: its raw body, a transport-specific handle used to ack it, and any
* out-of-band transport headers (e.g. the replay-bypass marker; empty when none).
*/
public record Reserved(String body, Object handle, Map<String, String> headers) {
/** A reserved message with no out-of-band headers. */
public Reserved(String body, Object handle) {
this(body, handle, Map.of());
}
public Reserved {
headers = headers == null ? Map.of() : Map.copyOf(headers);
}
}
/** The minimal transport surface {@link #redrive} needs, implemented over any broker. */
public interface Transport {
/** Reserve the next message from {@code queue}, or {@code null} when it is empty. */
Reserved pop(String queue) throws Exception;
/** Publish an already-encoded {@code body} to {@code queue}. */
void publish(String queue, String body) throws Exception;
/** Acknowledge (remove) a previously reserved message. */
void ack(Reserved message) throws Exception;
}
/**
* An optional {@link Transport} capability: publish a body together with out-of-band transport
* headers (e.g. the replay-bypass marker), for brokers that carry per-message metadata. A
* transport that does not implement it simply does not propagate headers — {@link #redrive}
* falls back to plain {@link Transport#publish} (ADR-0027).
*/
public interface HeaderPublisher {
void publishWithHeaders(String queue, String body, Map<String, String> headers) throws Exception;
}
/**
* Options for a {@link #redrive} run; immutable, built with the fluent withers from
* {@link #all()}.
*
* @param toQueue overrides the target queue (sandbox/redirect); when blank, each message
* goes back to its own {@code dead_letter.original_queue}
* @param max caps how many messages are pulled from the DLQ (0 = all available)
* @param dryRun inspect and report the plan, restoring every message unchanged
* @param select picks which messages to redrive (unselected are restored unchanged)
* @param bypass stamps the bq-replay-bypass header on each redriven message (see
* {@link Replay}); a no-op unless the transport is a {@link HeaderPublisher}
*/
public record Options(String toQueue, int max, boolean dryRun, Predicate<Envelope> select, boolean bypass) {
/** Redrive every message back to its source queue. */
public static Options all() {
return new Options(null, 0, false, null, false);
}
public Options toQueue(String queue) {
return new Options(queue, max, dryRun, select, bypass);
}
public Options max(int limit) {
return new Options(toQueue, limit, dryRun, select, bypass);
}
public Options dryRun(boolean enabled) {
return new Options(toQueue, max, enabled, select, bypass);
}
public Options select(Predicate<Envelope> predicate) {
return new Options(toQueue, max, dryRun, predicate, bypass);
}
public Options bypass(boolean enabled) {
return new Options(toQueue, max, dryRun, select, enabled);
}
}
/** What happened to one message during a {@link #redrive} run. */
public record Item(
String messageId,
String traceId,
String urn,
String reason,
String from,
String to,
boolean redriven,
boolean bypassed
) {
}
/** Summary of a {@link #redrive} run. */
public record Result(int redriven, int skipped, List<Item> items) {
public Result {
items = List.copyOf(items);
}
}
/**
* Returns a copy of {@code env} reset for reprocessing: the {@code dead_letter} block is
* removed and {@code attempts} reset to 0; {@code job}, {@code trace_id}, {@code data} and
* {@code meta} are preserved verbatim.
*/
public static Envelope reset(Envelope env) {
return new Envelope(env.job(), env.traceId(), env.data(), env.meta(), 0, null);
}
/**
* Drains dead-lettered messages off {@code dlq} and re-publishes each (reset) to its source
* queue or {@code opts.toQueue}. Messages are drained first and then processed, so restored
* messages (skipped, dry-run, or undecodable) are not re-encountered; a DLQ message is
* acknowledged only after a successful re-publish, and an undecodable body is restored
* rather than dropped.
*
* @return a {@link Result} with redriven/skipped counts and a per-message breakdown
* @throws Exception if the transport fails; on a publish failure the message is restored to
* the DLQ before the error is re-thrown
*/
public static Result redrive(Transport transport, String dlq, Options opts) throws Exception {
List<Reserved> batch = new ArrayList<>();
while (opts.max() == 0 || batch.size() < opts.max()) {
Reserved msg = transport.pop(dlq);
if (msg == null) {
break;
}
batch.add(msg);
}
int redriven = 0;
int skipped = 0;
List<Item> items = new ArrayList<>(batch.size());
for (Reserved msg : batch) {
// decode() never throws — a malformed/non-object body yields an empty envelope
// (a null job), which is not redrivable; restore it rather than drop it.
Envelope env = EnvelopeCodec.decode(msg.body());
if (env.job() == null || env.job().isBlank()) {
transport.publish(dlq, msg.body());
transport.ack(msg);
skipped++;
items.add(new Item(null, null, null, null, dlq, null, false, false));
continue;
}
String reason = env.deadLetter() == null ? null : env.deadLetter().reason();
String messageId = env.meta() == null ? null : env.meta().id();
if (opts.select() != null && !opts.select().test(env)) {
transport.publish(dlq, msg.body());
transport.ack(msg);
skipped++;
items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, null, false, false));
continue;
}
String target = opts.toQueue() != null && !opts.toQueue().isBlank()
? opts.toQueue()
: sourceQueueOf(env);
if (opts.dryRun()) {
transport.publish(dlq, msg.body());
transport.ack(msg);
skipped++;
items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, false, false));
continue;
}
boolean bypassed = false;
try {
String encoded = EnvelopeCodec.encode(reset(env));
if (opts.bypass() && transport instanceof HeaderPublisher hp) {
hp.publishWithHeaders(target, encoded, Map.of(Replay.HEADER_REPLAY_BYPASS, "1"));
bypassed = true;
} else {
transport.publish(target, encoded);
}
} catch (Exception publishFailure) {
transport.publish(dlq, msg.body());
transport.ack(msg);
throw publishFailure;
}
transport.ack(msg);
redriven++;
items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, true, bypassed));
}
return new Result(redriven, skipped, items);
}
private static String sourceQueueOf(Envelope env) {
DeadLetter dl = env.deadLetter();
if (dl != null && dl.originalQueue() != null && !dl.originalQueue().isBlank()) {
return dl.originalQueue();
}
return env.meta() == null ? null : env.meta().queue();
}
}