Skip to content

Commit cb6f82b

Browse files
DaisyModiDaisy Modiclaude
authored
Tune RPC retry policy to tolerate 1-2 min throttling on Temporal gRPC calls (#4176)
Add configurable RpcRetryOptions to WorkflowServiceStubsOptions in TemporalWorkflowClientFactory. The defaults (initialInterval=500ms, backoffCoefficient=2.0, maximumInterval=30s, maximumAttempts=10) provide ~151s of cumulative retry budget, enough to ride out a 2-minute throttle burst without failing worker status reporting. New config keys under gobblin.temporal.rpc.retry.options.* allow per-environment tuning without code changes. Co-authored-by: Daisy Modi <dmodi@linkedin.com> Co-authored-by: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
1 parent 2613bb8 commit cb6f82b

2 files changed

Lines changed: 40 additions & 0 deletions

File tree

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,22 @@ public interface GobblinTemporalConfigurationKeys {
157157
String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.attempts";
158158
int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 4;
159159

160+
/**
161+
* RPC retry options for gRPC calls to the Temporal service (e.g. worker status reporting).
162+
* Defaults are tuned to tolerate short-lived throttling windows of 1-2 minutes:
163+
* with initialInterval=500ms, coefficient=2.0, maximumInterval=30s, the cumulative wait across
164+
* 10 attempts is ~151.5s, providing coverage beyond a 2-minute throttle burst.
165+
*/
166+
String TEMPORAL_RPC_RETRY_OPTIONS = PREFIX + "rpc.retry.options.";
167+
String TEMPORAL_RPC_RETRY_OPTIONS_INITIAL_INTERVAL_MILLIS = TEMPORAL_RPC_RETRY_OPTIONS + "initial.interval.millis";
168+
int DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_INITIAL_INTERVAL_MILLIS = 500;
169+
String TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = TEMPORAL_RPC_RETRY_OPTIONS + "maximum.interval.seconds";
170+
int DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = 30;
171+
String TEMPORAL_RPC_RETRY_OPTIONS_BACKOFF_COEFFICIENT = TEMPORAL_RPC_RETRY_OPTIONS + "backoff.coefficient";
172+
double DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_BACKOFF_COEFFICIENT = 2.0;
173+
String TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = TEMPORAL_RPC_RETRY_OPTIONS + "maximum.attempts";
174+
int DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 10;
175+
160176
/**
161177
* Memory allocation for execution worker containers.
162178
*/

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.IOException;
2323
import java.io.InputStream;
2424
import java.security.KeyStore;
25+
import java.time.Duration;
2526
import java.util.Arrays;
2627
import java.util.Collections;
2728
import java.util.List;
@@ -37,6 +38,7 @@
3738
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
3839
import io.temporal.client.WorkflowClient;
3940
import io.temporal.client.WorkflowClientOptions;
41+
import io.temporal.serviceclient.RpcRetryOptions;
4042
import io.temporal.serviceclient.WorkflowServiceStubs;
4143
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
4244
import javax.net.ssl.KeyManagerFactory;
@@ -118,6 +120,7 @@ public static ManagedWorkflowServiceStubs createServiceInstance(String connectio
118120
.setEnableHttps(true)
119121
.setSslContext(sslContext)
120122
.setMetricsScope(metricsScope)
123+
.setRpcRetryOptions(buildRpcRetryOptions(config))
121124
.build();
122125
return new ManagedWorkflowServiceStubs(WorkflowServiceStubs.newServiceStubs(options));
123126
}
@@ -129,6 +132,27 @@ public static WorkflowClient createClientInstance(WorkflowServiceStubs service,
129132
return WorkflowClient.newInstance(service, options);
130133
}
131134

135+
private static RpcRetryOptions buildRpcRetryOptions(Config config) {
136+
int initialIntervalMillis = ConfigUtils.getInt(config,
137+
GobblinTemporalConfigurationKeys.TEMPORAL_RPC_RETRY_OPTIONS_INITIAL_INTERVAL_MILLIS,
138+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_INITIAL_INTERVAL_MILLIS);
139+
int maximumIntervalSeconds = ConfigUtils.getInt(config,
140+
GobblinTemporalConfigurationKeys.TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
141+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS);
142+
double backoffCoefficient = ConfigUtils.getDouble(config,
143+
GobblinTemporalConfigurationKeys.TEMPORAL_RPC_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
144+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_BACKOFF_COEFFICIENT);
145+
int maximumAttempts = ConfigUtils.getInt(config,
146+
GobblinTemporalConfigurationKeys.TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_ATTEMPTS,
147+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_ATTEMPTS);
148+
return RpcRetryOptions.newBuilder()
149+
.setInitialInterval(Duration.ofMillis(initialIntervalMillis))
150+
.setBackoffCoefficient(backoffCoefficient)
151+
.setMaximumInterval(Duration.ofSeconds(maximumIntervalSeconds))
152+
.setMaximumAttempts(maximumAttempts)
153+
.build();
154+
}
155+
132156
private static InputStream toInputStream(File storeFile)
133157
throws IOException {
134158
byte[] data = FileUtils.readFileToByteArray(storeFile);

0 commit comments

Comments
 (0)