-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathReplay.java
More file actions
94 lines (85 loc) · 4.04 KB
/
Copy pathReplay.java
File metadata and controls
94 lines (85 loc) · 4.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.babelqueue;
import java.util.Map;
/**
* Replay-Bypass — a side-effect guard for DLQ replay (ADR-0027).
*
* <p>A deliberate replay off the dead-letter queue ({@link Redrive}) re-runs the handler, and
* the handler's external side-effects re-fire: a second charge, a duplicate email.
* {@code Idempotent.wrap} stops an <i>accidental</i> duplicate; it does not stop the
* <i>intended</i> reprocess from re-firing effects that already happened. This closes that gap.
*
* <p>The marker that says "this is a replay, skip the external effects" rides <b>out of band</b>
* as the {@link #HEADER_REPLAY_BYPASS} transport header — never in the frozen envelope (GR-1).
* {@link Redrive#redrive} stamps it when its options set {@code bypass} and the transport is a
* {@link Redrive.HeaderPublisher}; a consume adapter, having reserved the message with its
* headers, surfaces the flag for the duration of the handler via {@link #process}:
*
* <pre>{@code
* Redrive.Reserved msg = transport.pop(queue);
* Envelope env = EnvelopeCodec.decode(msg.body());
* Replay.process(msg.headers(), () -> handler.handle(env)); // handler can now query isReplay()
* }</pre>
*
* <p>A handler wraps its external, non-idempotent side in {@link #bypassExternalEffects} so a
* replay re-runs the idempotent core but skips effects that already fired. The Java core is
* codec-only, so this is the core/runtime API + in-memory testing; a concrete broker transport
* carries the header once it implements {@link Redrive.HeaderPublisher} (a follow-up).
*/
public final class Replay {
/**
* The out-of-band transport header {@link Redrive} stamps (with {@code bypass}) on a replayed
* message, and that a consume adapter surfaces to the handler via {@link #process}.
*/
public static final String HEADER_REPLAY_BYPASS = "bq-replay-bypass";
private static final ThreadLocal<Boolean> REPLAY = ThreadLocal.withInitial(() -> Boolean.FALSE);
private Replay() {
}
/** An action that may throw — the external side of a handler. */
@FunctionalInterface
public interface Effect {
void run() throws Exception;
}
/**
* Reports whether the message currently being handled was redriven with the replay-bypass
* marker — i.e. a deliberate replay whose external side-effects should be skipped. Meaningful
* only inside a {@link #process} scope the consumer established from the message's headers.
*
* @return whether the current handling is a bypassed replay
*/
public static boolean isReplay() {
return REPLAY.get();
}
/**
* Runs {@code body} with the replay flag derived from a reserved message's transport headers
* (the presence of {@link #HEADER_REPLAY_BYPASS}), restoring the prior flag afterwards. A
* consume adapter wraps each handler invocation in this.
*
* @param headers the reserved message's out-of-band headers (may be null/empty)
* @param body the handler invocation to run within the replay scope
* @throws Exception whatever {@code body} throws
*/
public static void process(Map<String, String> headers, Effect body) throws Exception {
boolean replay = headers != null && headers.containsKey(HEADER_REPLAY_BYPASS);
boolean previous = REPLAY.get();
REPLAY.set(replay);
try {
body.run();
} finally {
REPLAY.set(previous);
}
}
/**
* Runs {@code effect} unless the current message is a {@linkplain #isReplay replay}, in which
* case it is skipped. Wrap the external, non-idempotent side of a handler — sending an email,
* charging a card, calling a third party — so a replay re-runs the idempotent core but does
* not re-fire effects that already happened.
*
* @param effect the external side-effect to run only when this is not a replay
* @throws Exception whatever {@code effect} throws
*/
public static void bypassExternalEffects(Effect effect) throws Exception {
if (!isReplay()) {
effect.run();
}
}
}