Skip to content

Commit 7331b7f

Browse files
Extracted out generic mergePlugins, and cleaned up call sites
1 parent 6a09735 commit 7331b7f

4 files changed

Lines changed: 121 additions & 106 deletions

File tree

temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java

Lines changed: 9 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.temporal.common.interceptors.WorkflowClientInterceptor;
1818
import io.temporal.internal.WorkflowThreadMarker;
1919
import io.temporal.internal.client.*;
20+
import io.temporal.internal.common.PluginUtils;
2021
import io.temporal.internal.client.NexusStartWorkflowResponse;
2122
import io.temporal.internal.client.external.GenericWorkflowClient;
2223
import io.temporal.internal.client.external.GenericWorkflowClientImpl;
@@ -75,7 +76,14 @@ public static WorkflowClient newInstance(
7576
extractClientPlugins(workflowServiceStubs.getOptions().getPlugins());
7677

7778
// Merge propagated plugins with client-specified plugins
78-
WorkflowClientPlugin[] mergedPlugins = mergePlugins(propagatedPlugins, options.getPlugins());
79+
WorkflowClientPlugin[] mergedPlugins =
80+
PluginUtils.mergePlugins(
81+
propagatedPlugins,
82+
options.getPlugins(),
83+
WorkflowClientPlugin::getName,
84+
log,
85+
"service stubs",
86+
WorkflowClientPlugin.class);
7987

8088
// Apply plugin configuration phase (forward order) on user-provided options,
8189
// so plugins see unmodified state before defaults and plugin merging
@@ -810,37 +818,4 @@ private static WorkflowClientPlugin[] extractClientPlugins(
810818
}
811819
return clientPlugins.toArray(new WorkflowClientPlugin[0]);
812820
}
813-
814-
/**
815-
* Merges propagated plugins with explicitly specified plugins. Propagated plugins come first
816-
* (from service stubs), followed by client-specific plugins.
817-
*/
818-
private static WorkflowClientPlugin[] mergePlugins(
819-
WorkflowClientPlugin[] propagated, WorkflowClientPlugin[] explicit) {
820-
boolean propagatedEmpty = propagated == null || propagated.length == 0;
821-
boolean explicitEmpty = explicit == null || explicit.length == 0;
822-
if (propagatedEmpty && explicitEmpty) {
823-
return new WorkflowClientPlugin[0];
824-
}
825-
if (propagatedEmpty) {
826-
return explicit;
827-
}
828-
if (explicitEmpty) {
829-
return propagated;
830-
}
831-
// Warn about duplicate plugin instances (same object in both lists)
832-
Set<WorkflowClientPlugin> propagatedSet = new HashSet<>(Arrays.asList(propagated));
833-
for (WorkflowClientPlugin p : explicit) {
834-
if (propagatedSet.contains(p)) {
835-
log.warn(
836-
"Plugin instance {} is present in both propagated plugins (from service stubs) and "
837-
+ "explicit plugins. It will run twice which may not be the intended behavior.",
838-
p.getName());
839-
}
840-
}
841-
WorkflowClientPlugin[] merged = new WorkflowClientPlugin[propagated.length + explicit.length];
842-
System.arraycopy(propagated, 0, merged, 0, propagated.length);
843-
System.arraycopy(explicit, 0, merged, propagated.length, explicit.length);
844-
return merged;
845-
}
846821
}

temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleClientImpl.java

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,14 @@
88
import io.temporal.internal.WorkflowThreadMarker;
99
import io.temporal.internal.client.NamespaceInjectWorkflowServiceStubs;
1010
import io.temporal.internal.client.RootScheduleClientInvoker;
11+
import io.temporal.internal.common.PluginUtils;
1112
import io.temporal.internal.client.external.GenericWorkflowClient;
1213
import io.temporal.internal.client.external.GenericWorkflowClientImpl;
1314
import io.temporal.serviceclient.MetricsTag;
1415
import io.temporal.serviceclient.WorkflowServiceStubs;
1516
import io.temporal.serviceclient.WorkflowServiceStubsPlugin;
1617
import java.util.ArrayList;
17-
import java.util.Arrays;
18-
import java.util.HashSet;
1918
import java.util.List;
20-
import java.util.Set;
2119
import java.util.stream.Stream;
2220
import javax.annotation.Nullable;
2321
import org.slf4j.Logger;
@@ -55,7 +53,14 @@ public static ScheduleClient newInstance(
5553
extractScheduleClientPlugins(workflowServiceStubs.getOptions().getPlugins());
5654

5755
// Merge propagated plugins with schedule client-specified plugins
58-
ScheduleClientPlugin[] mergedPlugins = mergePlugins(propagatedPlugins, options.getPlugins());
56+
ScheduleClientPlugin[] mergedPlugins =
57+
PluginUtils.mergePlugins(
58+
propagatedPlugins,
59+
options.getPlugins(),
60+
ScheduleClientPlugin::getName,
61+
log,
62+
"service stubs",
63+
ScheduleClientPlugin.class);
5964

6065
// Apply plugin configuration phase (forward order) on user-provided options,
6166
// so plugins see unmodified state before defaults and plugin merging
@@ -95,34 +100,6 @@ private static ScheduleClientPlugin[] extractScheduleClientPlugins(
95100
return schedulePlugins.toArray(new ScheduleClientPlugin[0]);
96101
}
97102

98-
private static ScheduleClientPlugin[] mergePlugins(
99-
ScheduleClientPlugin[] propagated, ScheduleClientPlugin[] explicit) {
100-
if ((propagated == null || propagated.length == 0)
101-
&& (explicit == null || explicit.length == 0)) {
102-
return new ScheduleClientPlugin[0];
103-
}
104-
// Warn about duplicate plugin instances (same object in both lists)
105-
if (propagated != null && propagated.length > 0 && explicit != null && explicit.length > 0) {
106-
Set<ScheduleClientPlugin> propagatedSet = new HashSet<>(Arrays.asList(propagated));
107-
for (ScheduleClientPlugin p : explicit) {
108-
if (propagatedSet.contains(p)) {
109-
log.warn(
110-
"Plugin instance {} is present in both propagated plugins (from service stubs) and "
111-
+ "explicit plugins. It will run twice which may not be the intended behavior.",
112-
p.getName());
113-
}
114-
}
115-
}
116-
List<ScheduleClientPlugin> merged = new ArrayList<>();
117-
if (propagated != null) {
118-
merged.addAll(Arrays.asList(propagated));
119-
}
120-
if (explicit != null) {
121-
merged.addAll(Arrays.asList(explicit));
122-
}
123-
return merged.toArray(new ScheduleClientPlugin[0]);
124-
}
125-
126103
private ScheduleClientCallsInterceptor initializeClientInvoker() {
127104
ScheduleClientCallsInterceptor scheduleClientInvoker =
128105
new RootScheduleClientInvoker(genericClient, options);
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.common;
22+
23+
import java.lang.reflect.Array;
24+
import java.util.Arrays;
25+
import java.util.HashSet;
26+
import java.util.Set;
27+
import java.util.function.Function;
28+
import javax.annotation.Nullable;
29+
import org.slf4j.Logger;
30+
31+
/** Internal utilities for plugin management. */
32+
public final class PluginUtils {
33+
34+
private PluginUtils() {}
35+
36+
/**
37+
* Merges propagated plugins with explicitly specified plugins. Propagated plugins come first,
38+
* followed by explicit plugins. Warns about duplicate plugin instances.
39+
*
40+
* @param propagated plugins propagated from a parent component (may be null or empty)
41+
* @param explicit plugins explicitly specified on this component (may be null or empty)
42+
* @param nameExtractor function to extract plugin name for logging
43+
* @param log logger for duplicate warnings
44+
* @param propagatedSource description of where propagated plugins come from (e.g., "service
45+
* stubs", "client")
46+
* @param <T> the plugin type
47+
* @return merged array of plugins, never null
48+
*/
49+
public static <T> T[] mergePlugins(
50+
@Nullable T[] propagated,
51+
@Nullable T[] explicit,
52+
Function<T, String> nameExtractor,
53+
Logger log,
54+
String propagatedSource,
55+
Class<T> pluginClass) {
56+
boolean propagatedEmpty = propagated == null || propagated.length == 0;
57+
boolean explicitEmpty = explicit == null || explicit.length == 0;
58+
59+
if (propagatedEmpty && explicitEmpty) {
60+
@SuppressWarnings("unchecked")
61+
T[] empty = (T[]) Array.newInstance(pluginClass, 0);
62+
return empty;
63+
}
64+
if (propagatedEmpty) {
65+
return explicit;
66+
}
67+
if (explicitEmpty) {
68+
return propagated;
69+
}
70+
71+
// Warn about duplicate plugin instances (same object in both lists)
72+
Set<T> propagatedSet = new HashSet<>(Arrays.asList(propagated));
73+
for (T p : explicit) {
74+
if (propagatedSet.contains(p)) {
75+
log.warn(
76+
"Plugin instance {} is present in both propagated plugins (from {}) and "
77+
+ "explicit plugins. It will run twice which may not be the intended behavior.",
78+
nameExtractor.apply(p),
79+
propagatedSource);
80+
}
81+
}
82+
83+
@SuppressWarnings("unchecked")
84+
T[] merged = (T[]) Array.newInstance(pluginClass, propagated.length + explicit.length);
85+
System.arraycopy(propagated, 0, merged, 0, propagated.length);
86+
System.arraycopy(explicit, 0, merged, propagated.length, explicit.length);
87+
return merged;
88+
}
89+
}

temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java

Lines changed: 14 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,17 @@
1212
import io.temporal.internal.sync.WorkflowThreadExecutor;
1313
import io.temporal.internal.task.VirtualThreadDelegate;
1414
import io.temporal.internal.worker.ShutdownManager;
15+
import io.temporal.internal.common.PluginUtils;
1516
import io.temporal.internal.worker.WorkflowExecutorCache;
1617
import io.temporal.internal.worker.WorkflowRunLockManager;
1718
import io.temporal.serviceclient.MetricsTag;
1819
import java.util.ArrayList;
1920
import java.util.Arrays;
2021
import java.util.Collections;
2122
import java.util.HashMap;
22-
import java.util.HashSet;
2323
import java.util.List;
2424
import java.util.Map;
2525
import java.util.Objects;
26-
import java.util.Set;
2726
import java.util.concurrent.CompletableFuture;
2827
import java.util.concurrent.ExecutorService;
2928
import java.util.concurrent.SynchronousQueue;
@@ -84,13 +83,21 @@ private WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factor
8483
String namespace = workflowClientOptions.getNamespace();
8584

8685
// Extract worker plugins from client (auto-propagation)
87-
List<WorkerPlugin> propagatedPlugins = extractWorkerPlugins(workflowClientOptions.getPlugins());
86+
WorkerPlugin[] propagatedPlugins = extractWorkerPlugins(workflowClientOptions.getPlugins());
8887

8988
// Get plugins explicitly set on factory options
9089
WorkerPlugin[] explicitPlugins = factoryOptions != null ? factoryOptions.getPlugins() : null;
9190

9291
// Merge propagated plugins with explicit plugins (propagated first)
93-
this.plugins = mergePlugins(propagatedPlugins, explicitPlugins);
92+
WorkerPlugin[] mergedPlugins =
93+
PluginUtils.mergePlugins(
94+
propagatedPlugins,
95+
explicitPlugins,
96+
WorkerPlugin::getName,
97+
log,
98+
"client",
99+
WorkerPlugin.class);
100+
this.plugins = Collections.unmodifiableList(Arrays.asList(mergedPlugins));
94101

95102
// Apply plugin configuration to factory options (forward order) on user-provided options,
96103
// so plugins see unmodified state before defaults and plugin merging
@@ -470,51 +477,18 @@ public String toString() {
470477
* Extracts worker plugins from the workflow client plugins array. Only plugins that also
471478
* implement {@link WorkerPlugin} are included.
472479
*/
473-
private static List<WorkerPlugin> extractWorkerPlugins(
480+
private static WorkerPlugin[] extractWorkerPlugins(
474481
io.temporal.client.WorkflowClientPlugin[] clientPlugins) {
475482
if (clientPlugins == null || clientPlugins.length == 0) {
476-
return Collections.emptyList();
483+
return new WorkerPlugin[0];
477484
}
478-
479485
List<WorkerPlugin> workerPlugins = new ArrayList<>();
480486
for (io.temporal.client.WorkflowClientPlugin plugin : clientPlugins) {
481487
if (plugin instanceof WorkerPlugin) {
482488
workerPlugins.add((WorkerPlugin) plugin);
483489
}
484490
}
485-
return Collections.unmodifiableList(workerPlugins);
486-
}
487-
488-
/**
489-
* Merges propagated plugins with explicitly specified plugins. Propagated plugins come first
490-
* (from client), followed by factory-specific plugins.
491-
*/
492-
private static List<WorkerPlugin> mergePlugins(
493-
List<WorkerPlugin> propagated, WorkerPlugin[] explicit) {
494-
if ((propagated == null || propagated.isEmpty())
495-
&& (explicit == null || explicit.length == 0)) {
496-
return Collections.emptyList();
497-
}
498-
if (propagated == null || propagated.isEmpty()) {
499-
return Collections.unmodifiableList(Arrays.asList(explicit));
500-
}
501-
if (explicit == null || explicit.length == 0) {
502-
return propagated;
503-
}
504-
// Warn about duplicate plugin instances (same object in both lists)
505-
Set<WorkerPlugin> propagatedSet = new HashSet<>(propagated);
506-
for (WorkerPlugin p : explicit) {
507-
if (propagatedSet.contains(p)) {
508-
log.warn(
509-
"Plugin instance {} is present in both propagated plugins (from client) and "
510-
+ "explicit plugins. It will run twice which may not be the intended behavior.",
511-
p.getName());
512-
}
513-
}
514-
List<WorkerPlugin> merged = new ArrayList<>(propagated.size() + explicit.length);
515-
merged.addAll(propagated);
516-
merged.addAll(Arrays.asList(explicit));
517-
return Collections.unmodifiableList(merged);
491+
return workerPlugins.toArray(new WorkerPlugin[0]);
518492
}
519493

520494
/**

0 commit comments

Comments
 (0)