Skip to content

Commit 19fb453

Browse files
Add data converters, activities, workflows, Nexus services
1 parent 8a5d85f commit 19fb453

2 files changed

Lines changed: 217 additions & 0 deletions

File tree

temporal-sdk/src/main/java/io/temporal/common/SimplePlugin.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.temporal.client.schedules.ScheduleClientOptions;
2626
import io.temporal.client.schedules.ScheduleClientPlugin;
2727
import io.temporal.common.context.ContextPropagator;
28+
import io.temporal.common.converter.DataConverter;
2829
import io.temporal.common.interceptors.WorkerInterceptor;
2930
import io.temporal.common.interceptors.WorkflowClientInterceptor;
3031
import io.temporal.serviceclient.WorkflowServiceStubs;
@@ -125,6 +126,10 @@ public abstract class SimplePlugin
125126
private final List<WorkerInterceptor> workerInterceptors;
126127
private final List<WorkflowClientInterceptor> clientInterceptors;
127128
private final List<ContextPropagator> contextPropagators;
129+
private final DataConverter dataConverter;
130+
private final List<Class<?>> workflowImplementationTypes;
131+
private final List<Object> activitiesImplementations;
132+
private final List<Object> nexusServiceImplementations;
128133

129134
/**
130135
* Creates a new plugin with the specified name. Use this constructor when subclassing to override
@@ -149,6 +154,10 @@ protected SimplePlugin(@Nonnull String name) {
149154
this.workerInterceptors = Collections.emptyList();
150155
this.clientInterceptors = Collections.emptyList();
151156
this.contextPropagators = Collections.emptyList();
157+
this.dataConverter = null;
158+
this.workflowImplementationTypes = Collections.emptyList();
159+
this.activitiesImplementations = Collections.emptyList();
160+
this.nexusServiceImplementations = Collections.emptyList();
152161
}
153162

154163
/**
@@ -174,6 +183,10 @@ protected SimplePlugin(@Nonnull Builder builder) {
174183
this.workerInterceptors = new ArrayList<>(builder.workerInterceptors);
175184
this.clientInterceptors = new ArrayList<>(builder.clientInterceptors);
176185
this.contextPropagators = new ArrayList<>(builder.contextPropagators);
186+
this.dataConverter = builder.dataConverter;
187+
this.workflowImplementationTypes = new ArrayList<>(builder.workflowImplementationTypes);
188+
this.activitiesImplementations = new ArrayList<>(builder.activitiesImplementations);
189+
this.nexusServiceImplementations = new ArrayList<>(builder.nexusServiceImplementations);
177190
}
178191

179192
/**
@@ -207,6 +220,11 @@ public void configureWorkflowClient(@Nonnull WorkflowClientOptions.Builder build
207220
customizer.accept(builder);
208221
}
209222

223+
// Set data converter
224+
if (dataConverter != null) {
225+
builder.setDataConverter(dataConverter);
226+
}
227+
210228
// Add client interceptors
211229
if (!clientInterceptors.isEmpty()) {
212230
WorkflowClientInterceptor[] existing = builder.build().getInterceptors();
@@ -260,6 +278,23 @@ public void configureWorker(@Nonnull String taskQueue, @Nonnull WorkerOptions.Bu
260278

261279
@Override
262280
public void initializeWorker(@Nonnull String taskQueue, @Nonnull Worker worker) {
281+
// Register workflow implementation types
282+
if (!workflowImplementationTypes.isEmpty()) {
283+
worker.registerWorkflowImplementationTypes(
284+
workflowImplementationTypes.toArray(new Class<?>[0]));
285+
}
286+
287+
// Register activities implementations
288+
if (!activitiesImplementations.isEmpty()) {
289+
worker.registerActivitiesImplementations(activitiesImplementations.toArray());
290+
}
291+
292+
// Register nexus service implementations
293+
for (Object nexusService : nexusServiceImplementations) {
294+
worker.registerNexusServiceImplementation(nexusService);
295+
}
296+
297+
// Apply custom initializers
263298
for (BiConsumer<String, Worker> initializer : workerInitializers) {
264299
initializer.accept(taskQueue, worker);
265300
}
@@ -331,6 +366,10 @@ public static final class Builder {
331366
private final List<WorkerInterceptor> workerInterceptors = new ArrayList<>();
332367
private final List<WorkflowClientInterceptor> clientInterceptors = new ArrayList<>();
333368
private final List<ContextPropagator> contextPropagators = new ArrayList<>();
369+
private DataConverter dataConverter;
370+
private final List<Class<?>> workflowImplementationTypes = new ArrayList<>();
371+
private final List<Object> activitiesImplementations = new ArrayList<>();
372+
private final List<Object> nexusServiceImplementations = new ArrayList<>();
334373

335374
private Builder(@Nonnull String name) {
336375
this.name = Objects.requireNonNull(name, "Plugin name cannot be null");
@@ -558,6 +597,78 @@ public Builder addContextPropagators(ContextPropagator... propagators) {
558597
return this;
559598
}
560599

600+
/**
601+
* Sets the data converter to use for serializing workflow and activity arguments and results.
602+
* This overrides any data converter previously set on the client options.
603+
*
604+
* @param dataConverter the data converter to use
605+
* @return this builder for chaining
606+
*/
607+
public Builder setDataConverter(@Nonnull DataConverter dataConverter) {
608+
this.dataConverter = Objects.requireNonNull(dataConverter);
609+
return this;
610+
}
611+
612+
/**
613+
* Registers workflow implementation types. These workflows will be registered on all workers
614+
* created by the factory.
615+
*
616+
* <p>Example:
617+
*
618+
* <pre>{@code
619+
* SimplePlugin.newBuilder("my-plugin")
620+
* .registerWorkflowImplementationTypes(MyWorkflowImpl.class, OtherWorkflowImpl.class)
621+
* .build();
622+
* }</pre>
623+
*
624+
* @param workflowImplementationTypes workflow implementation classes to register
625+
* @return this builder for chaining
626+
*/
627+
public Builder registerWorkflowImplementationTypes(Class<?>... workflowImplementationTypes) {
628+
this.workflowImplementationTypes.addAll(Arrays.asList(workflowImplementationTypes));
629+
return this;
630+
}
631+
632+
/**
633+
* Registers activity implementations. These activities will be registered on all workers
634+
* created by the factory.
635+
*
636+
* <p>Example:
637+
*
638+
* <pre>{@code
639+
* SimplePlugin.newBuilder("my-plugin")
640+
* .registerActivitiesImplementations(new MyActivityImpl(), new OtherActivityImpl())
641+
* .build();
642+
* }</pre>
643+
*
644+
* @param activitiesImplementations activity implementation instances to register
645+
* @return this builder for chaining
646+
*/
647+
public Builder registerActivitiesImplementations(Object... activitiesImplementations) {
648+
this.activitiesImplementations.addAll(Arrays.asList(activitiesImplementations));
649+
return this;
650+
}
651+
652+
/**
653+
* Registers a Nexus service implementation. The service will be registered on all workers
654+
* created by the factory.
655+
*
656+
* <p>Example:
657+
*
658+
* <pre>{@code
659+
* SimplePlugin.newBuilder("my-plugin")
660+
* .registerNexusServiceImplementation(new MyNexusServiceImpl())
661+
* .build();
662+
* }</pre>
663+
*
664+
* @param nexusServiceImplementation the Nexus service implementation to register
665+
* @return this builder for chaining
666+
*/
667+
public Builder registerNexusServiceImplementation(@Nonnull Object nexusServiceImplementation) {
668+
this.nexusServiceImplementations.add(Objects.requireNonNull(nexusServiceImplementation));
669+
return this;
670+
}
671+
561672
/**
562673
* Builds the plugin with the configured settings.
563674
*

temporal-sdk/src/test/java/io/temporal/common/SimplePluginBuilderTest.java

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
package io.temporal.common;
2222

2323
import static org.junit.Assert.*;
24+
import static org.mockito.Mockito.*;
2425

2526
import io.temporal.client.WorkflowClientOptions;
27+
import io.temporal.common.converter.DataConverter;
2628
import io.temporal.common.interceptors.WorkerInterceptor;
2729
import io.temporal.common.interceptors.WorkerInterceptorBase;
2830
import io.temporal.common.interceptors.WorkflowClientInterceptor;
2931
import io.temporal.common.interceptors.WorkflowClientInterceptorBase;
3032
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
33+
import io.temporal.worker.Worker;
3134
import io.temporal.worker.WorkerFactoryOptions;
3235
import io.temporal.worker.WorkerOptions;
3336
import java.util.concurrent.atomic.AtomicBoolean;
@@ -409,4 +412,107 @@ public void testNullName() {
409412
public void testNullCustomizer() {
410413
SimplePlugin.newBuilder("test").customizeClient(null);
411414
}
415+
416+
@Test
417+
public void testSetDataConverter() {
418+
DataConverter customConverter = mock(DataConverter.class);
419+
420+
SimplePlugin plugin = SimplePlugin.newBuilder("test").setDataConverter(customConverter).build();
421+
422+
WorkflowClientOptions.Builder builder = WorkflowClientOptions.newBuilder();
423+
((io.temporal.client.WorkflowClientPlugin) plugin).configureWorkflowClient(builder);
424+
425+
assertSame(customConverter, builder.build().getDataConverter());
426+
}
427+
428+
@Test(expected = NullPointerException.class)
429+
public void testNullDataConverter() {
430+
SimplePlugin.newBuilder("test").setDataConverter(null);
431+
}
432+
433+
@Test
434+
public void testRegisterWorkflowImplementationTypes() {
435+
SimplePlugin plugin =
436+
SimplePlugin.newBuilder("test")
437+
.registerWorkflowImplementationTypes(String.class, Integer.class)
438+
.build();
439+
440+
Worker mockWorker = mock(Worker.class);
441+
((io.temporal.worker.WorkerPlugin) plugin).initializeWorker("test-queue", mockWorker);
442+
443+
verify(mockWorker).registerWorkflowImplementationTypes(String.class, Integer.class);
444+
}
445+
446+
@Test
447+
public void testRegisterActivitiesImplementations() {
448+
Object activity1 = new Object();
449+
Object activity2 = new Object();
450+
451+
SimplePlugin plugin =
452+
SimplePlugin.newBuilder("test")
453+
.registerActivitiesImplementations(activity1, activity2)
454+
.build();
455+
456+
Worker mockWorker = mock(Worker.class);
457+
((io.temporal.worker.WorkerPlugin) plugin).initializeWorker("test-queue", mockWorker);
458+
459+
verify(mockWorker).registerActivitiesImplementations(activity1, activity2);
460+
}
461+
462+
@Test
463+
public void testRegisterNexusServiceImplementation() {
464+
Object nexusService = new Object();
465+
466+
SimplePlugin plugin =
467+
SimplePlugin.newBuilder("test").registerNexusServiceImplementation(nexusService).build();
468+
469+
Worker mockWorker = mock(Worker.class);
470+
((io.temporal.worker.WorkerPlugin) plugin).initializeWorker("test-queue", mockWorker);
471+
472+
verify(mockWorker).registerNexusServiceImplementation(nexusService);
473+
}
474+
475+
@Test
476+
public void testRegisterMultipleNexusServiceImplementations() {
477+
Object nexusService1 = new Object();
478+
Object nexusService2 = new Object();
479+
480+
SimplePlugin plugin =
481+
SimplePlugin.newBuilder("test")
482+
.registerNexusServiceImplementation(nexusService1)
483+
.registerNexusServiceImplementation(nexusService2)
484+
.build();
485+
486+
Worker mockWorker = mock(Worker.class);
487+
((io.temporal.worker.WorkerPlugin) plugin).initializeWorker("test-queue", mockWorker);
488+
489+
verify(mockWorker).registerNexusServiceImplementation(nexusService1);
490+
verify(mockWorker).registerNexusServiceImplementation(nexusService2);
491+
}
492+
493+
@Test(expected = NullPointerException.class)
494+
public void testNullNexusServiceImplementation() {
495+
SimplePlugin.newBuilder("test").registerNexusServiceImplementation(null);
496+
}
497+
498+
@Test
499+
public void testRegistrationsWithCustomInitializer() {
500+
AtomicBoolean customInitializerCalled = new AtomicBoolean(false);
501+
502+
SimplePlugin plugin =
503+
SimplePlugin.newBuilder("test")
504+
.registerWorkflowImplementationTypes(String.class)
505+
.registerActivitiesImplementations(new Object())
506+
.initializeWorker((taskQueue, worker) -> customInitializerCalled.set(true))
507+
.build();
508+
509+
Worker mockWorker = mock(Worker.class);
510+
((io.temporal.worker.WorkerPlugin) plugin).initializeWorker("test-queue", mockWorker);
511+
512+
// Verify registrations happen before custom initializer
513+
verify(mockWorker).registerWorkflowImplementationTypes(String.class);
514+
verify(mockWorker).registerActivitiesImplementations(any());
515+
assertTrue(
516+
"Custom initializer should be called after registrations", customInitializerCalled.get());
517+
}
412518
}

0 commit comments

Comments
 (0)