Skip to content

Commit 76d0482

Browse files
first stab at replayer
1 parent f1d3642 commit 76d0482

8 files changed

Lines changed: 579 additions & 12 deletions

File tree

temporal-sdk/src/main/java/io/temporal/common/SimplePlugin.java

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ public abstract class SimplePlugin
123123
private final List<BiConsumer<String, Worker>> workerShutdownCallbacks;
124124
private final List<Consumer<WorkerFactory>> workerFactoryStartCallbacks;
125125
private final List<Consumer<WorkerFactory>> workerFactoryShutdownCallbacks;
126+
private final List<Consumer<WorkerOptions.Builder>> replayWorkerCustomizers;
127+
private final List<BiConsumer<String, Worker>> replayWorkerInitializers;
128+
private final List<BiConsumer<Worker, WorkflowExecutionHistory>> replayExecutionCallbacks;
126129
private final List<WorkerInterceptor> workerInterceptors;
127130
private final List<WorkflowClientInterceptor> clientInterceptors;
128131
private final List<ContextPropagator> contextPropagators;
@@ -151,6 +154,9 @@ protected SimplePlugin(@Nonnull String name) {
151154
this.workerShutdownCallbacks = Collections.emptyList();
152155
this.workerFactoryStartCallbacks = Collections.emptyList();
153156
this.workerFactoryShutdownCallbacks = Collections.emptyList();
157+
this.replayWorkerCustomizers = Collections.emptyList();
158+
this.replayWorkerInitializers = Collections.emptyList();
159+
this.replayExecutionCallbacks = Collections.emptyList();
154160
this.workerInterceptors = Collections.emptyList();
155161
this.clientInterceptors = Collections.emptyList();
156162
this.contextPropagators = Collections.emptyList();
@@ -180,6 +186,9 @@ protected SimplePlugin(@Nonnull Builder builder) {
180186
this.workerShutdownCallbacks = new ArrayList<>(builder.workerShutdownCallbacks);
181187
this.workerFactoryStartCallbacks = new ArrayList<>(builder.workerFactoryStartCallbacks);
182188
this.workerFactoryShutdownCallbacks = new ArrayList<>(builder.workerFactoryShutdownCallbacks);
189+
this.replayWorkerCustomizers = new ArrayList<>(builder.replayWorkerCustomizers);
190+
this.replayWorkerInitializers = new ArrayList<>(builder.replayWorkerInitializers);
191+
this.replayExecutionCallbacks = new ArrayList<>(builder.replayExecutionCallbacks);
183192
this.workerInterceptors = new ArrayList<>(builder.workerInterceptors);
184193
this.clientInterceptors = new ArrayList<>(builder.clientInterceptors);
185194
this.contextPropagators = new ArrayList<>(builder.contextPropagators);
@@ -340,6 +349,41 @@ public void shutdownWorkerFactory(WorkerFactory factory, Runnable next) throws E
340349
next.run();
341350
}
342351

352+
@Override
353+
public void configureReplayWorker(
354+
@Nonnull String taskQueue, @Nonnull WorkerOptions.Builder builder) {
355+
if (replayWorkerCustomizers.isEmpty()) {
356+
// Default: delegate to configureWorker
357+
WorkerPlugin.super.configureReplayWorker(taskQueue, builder);
358+
} else {
359+
for (Consumer<WorkerOptions.Builder> customizer : replayWorkerCustomizers) {
360+
customizer.accept(builder);
361+
}
362+
}
363+
}
364+
365+
@Override
366+
public void initializeReplayWorker(@Nonnull String taskQueue, @Nonnull Worker worker) {
367+
if (replayWorkerInitializers.isEmpty()) {
368+
// Default: delegate to initializeWorker
369+
WorkerPlugin.super.initializeReplayWorker(taskQueue, worker);
370+
} else {
371+
for (BiConsumer<String, Worker> initializer : replayWorkerInitializers) {
372+
initializer.accept(taskQueue, worker);
373+
}
374+
}
375+
}
376+
377+
@Override
378+
public void replayWorkflowExecution(
379+
@Nonnull Worker worker, @Nonnull WorkflowExecutionHistory history, @Nonnull Runnable next)
380+
throws Exception {
381+
next.run();
382+
for (BiConsumer<Worker, WorkflowExecutionHistory> callback : replayExecutionCallbacks) {
383+
callback.accept(worker, history);
384+
}
385+
}
386+
343387
@Override
344388
public String toString() {
345389
return getClass().getSimpleName() + "{name='" + name + "'}";
@@ -363,6 +407,10 @@ public static final class Builder {
363407
private final List<BiConsumer<String, Worker>> workerShutdownCallbacks = new ArrayList<>();
364408
private final List<Consumer<WorkerFactory>> workerFactoryStartCallbacks = new ArrayList<>();
365409
private final List<Consumer<WorkerFactory>> workerFactoryShutdownCallbacks = new ArrayList<>();
410+
private final List<Consumer<WorkerOptions.Builder>> replayWorkerCustomizers = new ArrayList<>();
411+
private final List<BiConsumer<String, Worker>> replayWorkerInitializers = new ArrayList<>();
412+
private final List<BiConsumer<Worker, WorkflowExecutionHistory>> replayExecutionCallbacks =
413+
new ArrayList<>();
366414
private final List<WorkerInterceptor> workerInterceptors = new ArrayList<>();
367415
private final List<WorkflowClientInterceptor> clientInterceptors = new ArrayList<>();
368416
private final List<ContextPropagator> contextPropagators = new ArrayList<>();
@@ -561,6 +609,59 @@ public Builder onWorkerFactoryShutdown(@Nonnull Consumer<WorkerFactory> callback
561609
return this;
562610
}
563611

612+
// ==================== Replay Methods ====================
613+
614+
/**
615+
* Adds a customizer for {@link WorkerOptions} that is applied only when creating replay
616+
* workers. If no replay-specific customizers are set, the regular worker customizers are used.
617+
*
618+
* <p>Use this when replay workers need different configuration than normal workers.
619+
*
620+
* @param customizer a consumer that modifies the options builder
621+
* @return this builder for chaining
622+
*/
623+
public Builder customizeReplayWorker(@Nonnull Consumer<WorkerOptions.Builder> customizer) {
624+
replayWorkerCustomizers.add(Objects.requireNonNull(customizer));
625+
return this;
626+
}
627+
628+
/**
629+
* Adds an initializer that is called after a replay worker is created. If no replay-specific
630+
* initializers are set, the regular worker initializers are used.
631+
*
632+
* <p>Use this when replay workers need different initialization than normal workers.
633+
*
634+
* @param initializer a consumer that receives the task queue name and worker
635+
* @return this builder for chaining
636+
*/
637+
public Builder initializeReplayWorker(@Nonnull BiConsumer<String, Worker> initializer) {
638+
replayWorkerInitializers.add(Objects.requireNonNull(initializer));
639+
return this;
640+
}
641+
642+
/**
643+
* Adds a callback that is invoked after a workflow execution is replayed. This can be used for
644+
* logging, metrics, or other observability around replay operations.
645+
*
646+
* <p>Example:
647+
*
648+
* <pre>{@code
649+
* SimplePlugin.newBuilder("my-plugin")
650+
* .onReplayWorkflowExecution((worker, history) -> {
651+
* logger.info("Replayed workflow: {}", history.getWorkflowExecution().getWorkflowId());
652+
* })
653+
* .build();
654+
* }</pre>
655+
*
656+
* @param callback a consumer that receives the worker and history after replay completes
657+
* @return this builder for chaining
658+
*/
659+
public Builder onReplayWorkflowExecution(
660+
@Nonnull BiConsumer<Worker, WorkflowExecutionHistory> callback) {
661+
replayExecutionCallbacks.add(Objects.requireNonNull(callback));
662+
return this;
663+
}
664+
564665
/**
565666
* Adds worker interceptors. Interceptors are appended to any existing interceptors in the
566667
* configuration.

temporal-sdk/src/main/java/io/temporal/worker/Worker.java

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public final class Worker {
4242
private static final Logger log = LoggerFactory.getLogger(Worker.class);
4343
private final WorkerOptions options;
4444
private final String taskQueue;
45+
private final List<WorkerPlugin> plugins;
4546
final SyncWorkflowWorker workflowWorker;
4647
final SyncActivityWorker activityWorker;
4748
final SyncNexusWorker nexusWorker;
@@ -67,9 +68,11 @@ public final class Worker {
6768
@Nonnull WorkflowExecutorCache cache,
6869
boolean useStickyTaskQueue,
6970
WorkflowThreadExecutor workflowThreadExecutor,
70-
List<ContextPropagator> contextPropagators) {
71+
List<ContextPropagator> contextPropagators,
72+
@Nonnull List<WorkerPlugin> plugins) {
7173

7274
Objects.requireNonNull(client, "client should not be null");
75+
this.plugins = Objects.requireNonNull(plugins, "plugins should not be null");
7376
Preconditions.checkArgument(
7477
!Strings.isNullOrEmpty(taskQueue), "taskQueue should not be an empty string");
7578
this.taskQueue = taskQueue;
@@ -469,12 +472,57 @@ public String toString() {
469472
@SuppressWarnings("deprecation")
470473
public void replayWorkflowExecution(io.temporal.internal.common.WorkflowExecutionHistory history)
471474
throws Exception {
472-
workflowWorker.queryWorkflowExecution(
473-
history,
474-
WorkflowClient.QUERY_TYPE_REPLAY_ONLY,
475-
String.class,
476-
String.class,
477-
new Object[] {});
475+
// Convert to public history type for plugin API
476+
WorkflowExecutionHistory publicHistory =
477+
new WorkflowExecutionHistory(
478+
history.getHistory(), history.getWorkflowExecution().getWorkflowId());
479+
480+
// Build plugin chain in reverse order (first plugin wraps all others)
481+
// Wrap checked exception in RuntimeException for Runnable compatibility
482+
Runnable chain =
483+
() -> {
484+
try {
485+
workflowWorker.queryWorkflowExecution(
486+
history,
487+
WorkflowClient.QUERY_TYPE_REPLAY_ONLY,
488+
String.class,
489+
String.class,
490+
new Object[] {});
491+
} catch (Exception e) {
492+
throw new ReplayException(e);
493+
}
494+
};
495+
496+
for (int i = plugins.size() - 1; i >= 0; i--) {
497+
WorkerPlugin plugin = plugins.get(i);
498+
Runnable next = chain;
499+
chain =
500+
() -> {
501+
try {
502+
plugin.replayWorkflowExecution(this, publicHistory, next);
503+
} catch (Exception e) {
504+
throw new ReplayException(e);
505+
}
506+
};
507+
}
508+
509+
try {
510+
chain.run();
511+
} catch (ReplayException e) {
512+
throw e.getCause();
513+
}
514+
}
515+
516+
/** Internal exception to wrap checked exceptions during replay. */
517+
private static class ReplayException extends RuntimeException {
518+
ReplayException(Exception cause) {
519+
super(cause);
520+
}
521+
522+
@Override
523+
public synchronized Exception getCause() {
524+
return (Exception) super.getCause();
525+
}
478526
}
479527

480528
/**

temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.temporal.api.workflowservice.v1.DescribeNamespaceRequest;
88
import io.temporal.client.WorkflowClient;
99
import io.temporal.client.WorkflowClientOptions;
10+
import io.temporal.common.Experimental;
1011
import io.temporal.common.converter.DataConverter;
1112
import io.temporal.internal.client.WorkflowClientInternal;
1213
import io.temporal.internal.sync.WorkflowThreadExecutor;
@@ -175,7 +176,8 @@ public synchronized Worker newWorker(String taskQueue, WorkerOptions options) {
175176
cache,
176177
true,
177178
workflowThreadExecutor,
178-
workflowClient.getOptions().getContextPropagators());
179+
workflowClient.getOptions().getContextPropagators(),
180+
plugins);
179181
workers.put(taskQueue, worker);
180182

181183
// Go through the plugins to call plugin initializeWorker hooks (e.g. register workflows,
@@ -194,6 +196,71 @@ public synchronized Worker newWorker(String taskQueue, WorkerOptions options) {
194196
}
195197
}
196198

199+
/**
200+
* Creates a worker specifically for replay operations. This method should be used when replaying
201+
* workflow histories to ensure plugins receive the replay-specific configuration callbacks.
202+
*
203+
* <p>Unlike {@link #newWorker(String, WorkerOptions)}, this method:
204+
*
205+
* <ul>
206+
* <li>Calls {@link WorkerPlugin#configureReplayWorker} instead of {@link
207+
* WorkerPlugin#configureWorker}
208+
* <li>Calls {@link WorkerPlugin#initializeReplayWorker} instead of {@link
209+
* WorkerPlugin#initializeWorker}
210+
* </ul>
211+
*
212+
* <p>This allows plugins to apply different configuration for replay scenarios (e.g., disabling
213+
* certain interceptors that shouldn't run during replay).
214+
*
215+
* @param taskQueue task queue name for the replay worker
216+
* @param options Options for configuring the replay worker
217+
* @return Worker configured for replay
218+
*/
219+
@Experimental
220+
public synchronized Worker newReplayWorker(String taskQueue, WorkerOptions options) {
221+
Preconditions.checkArgument(
222+
!Strings.isNullOrEmpty(taskQueue), "taskQueue should not be an empty string");
223+
Preconditions.checkState(
224+
state == State.Initial,
225+
String.format(statusErrorMessage, "create new worker", state.name(), State.Initial.name()));
226+
227+
// Apply replay-specific plugin configuration to worker options (forward order)
228+
options = applyReplayWorkerPluginConfiguration(taskQueue, options, this.plugins);
229+
230+
// Only one worker can exist for a task queue
231+
Worker existingWorker = workers.get(taskQueue);
232+
if (existingWorker == null) {
233+
Worker worker =
234+
new Worker(
235+
workflowClient,
236+
taskQueue,
237+
factoryOptions,
238+
options,
239+
metricsScope,
240+
runLocks,
241+
cache,
242+
true,
243+
workflowThreadExecutor,
244+
workflowClient.getOptions().getContextPropagators(),
245+
plugins);
246+
workers.put(taskQueue, worker);
247+
248+
// Go through the plugins to call plugin initializeReplayWorker hooks (e.g. register
249+
// workflows)
250+
for (WorkerPlugin plugin : plugins) {
251+
plugin.initializeReplayWorker(taskQueue, worker);
252+
}
253+
254+
return worker;
255+
} else {
256+
log.warn(
257+
"Only one worker can be registered for a task queue, "
258+
+ "subsequent calls to WorkerFactory#newReplayWorker with the same task queue are ignored and "
259+
+ "initially created worker is returned");
260+
return existingWorker;
261+
}
262+
}
263+
197264
/**
198265
* @param taskQueue task queue name to lookup an existing worker for
199266
* @return a worker created previously through {@link #newWorker(String)} for the given task
@@ -598,6 +665,25 @@ private static WorkerOptions applyWorkerPluginConfiguration(
598665
return builder.build();
599666
}
600667

668+
/**
669+
* Applies replay-specific plugin configuration to worker options. Plugins are called in forward
670+
* (registration) order.
671+
*/
672+
private static WorkerOptions applyReplayWorkerPluginConfiguration(
673+
String taskQueue, WorkerOptions options, List<WorkerPlugin> plugins) {
674+
if (plugins == null || plugins.isEmpty()) {
675+
return options;
676+
}
677+
678+
WorkerOptions.Builder builder =
679+
options == null ? WorkerOptions.newBuilder() : WorkerOptions.newBuilder(options);
680+
681+
for (WorkerPlugin plugin : plugins) {
682+
plugin.configureReplayWorker(taskQueue, builder);
683+
}
684+
return builder.build();
685+
}
686+
601687
enum State {
602688
Initial,
603689
Started,

0 commit comments

Comments
 (0)