Skip to content

Commit 2613bb8

Browse files
DaisyModiDaisy Modiclaude
authored
[GOBBLIN-2257] Parallelize flow compilation on submission path (#4175)
* Parallelize flow compilation on submission path Flow compilation happens twice: on submission and on execution. The execution path already runs on a thread pool of size 3 (DagProcessingEngine), but the submission path was serialized by a synchronized block in SpecCatalogListenersList.onAddSpec() and a single-thread executor in CallbacksDispatcher. This change removes the synchronized keyword from onAddSpec() and replaces the single-thread executor with a configurable thread pool (default 3 threads), allowing multiple flows to compile in parallel during submission. Compilation is still serialized per flow but multiple flows now compile concurrently. The thread pool size is configurable via: gobblin.service.specCatalogListener.numThreads (default: 3) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Use bounded ThreadPoolExecutor with CallerRunsPolicy Replace Executors.newFixedThreadPool (unbounded queue) with a bounded ThreadPoolExecutor to prevent memory growth under heavy submission load. Queue capacity is set to numThreads * 10 with CallerRunsPolicy so that when the queue is full, the submitting thread runs the compilation itself, providing natural backpressure. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add comments explaining parallelization rationale Add inline comment in FlowCatalog explaining why the submission path is now parallelized and Javadoc on the new SpecCatalogListenersList constructor documenting the design decision and thread-safety reasoning. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Daisy Modi <dmodi@linkedin.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 26406a8 commit 2613bb8

3 files changed

Lines changed: 35 additions & 2 deletions

File tree

gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ public class ServiceConfigKeys {
159159
public static final String NUM_DAG_PROC_THREADS_KEY = GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
160160
public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY = GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
161161
public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
162+
public static final String NUM_SPEC_CATALOG_LISTENER_THREADS_KEY = GOBBLIN_SERVICE_PREFIX + "specCatalogListener.numThreads";
163+
public static final int DEFAULT_NUM_SPEC_CATALOG_LISTENER_THREADS = 3;
162164
public static final long DEFAULT_FLOW_FINISH_DEADLINE_MILLIS = TimeUnit.HOURS.toMillis(24);
163165

164166
public static final String ERROR_PATTERN_STORE_CLASS = ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "errorPatternStore.class";

gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
import java.util.List;
2828
import java.util.Map;
2929
import java.util.Properties;
30+
import java.util.concurrent.ExecutorService;
31+
import java.util.concurrent.LinkedBlockingQueue;
32+
import java.util.concurrent.ThreadPoolExecutor;
33+
import java.util.concurrent.TimeUnit;
3034

3135
import org.apache.commons.lang3.reflect.ConstructorUtils;
3236
import org.slf4j.Logger;
@@ -62,6 +66,7 @@
6266
import org.apache.gobblin.service.ServiceConfigKeys;
6367
import org.apache.gobblin.util.ClassAliasResolver;
6468
import org.apache.gobblin.util.ConfigUtils;
69+
import org.apache.gobblin.util.ExecutorsUtils;
6570
import org.apache.gobblin.util.ExponentialBackoff;
6671
import org.apache.gobblin.util.callbacks.CallbackResult;
6772
import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
@@ -110,7 +115,20 @@ public FlowCatalog(Config config, GobblinInstanceEnvironment env) {
110115

111116
public FlowCatalog(Config config, Optional<Logger> log, Optional<MetricContext> parentMetricContext, boolean instrumentationEnabled) {
112117
this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
113-
this.listeners = new SpecCatalogListenersList(log);
118+
// Flow compilation on the submission path was previously serialized by a synchronized block in
119+
// SpecCatalogListenersList.onAddSpec() and a single-thread executor in CallbacksDispatcher.
120+
// Since the execution path (DagProcessingEngine) already compiles flows concurrently on a thread pool
121+
// of size 3, compileFlow() is proven thread-safe. We use a bounded ThreadPoolExecutor here to allow
122+
// parallel compilation across flows during submission, with CallerRunsPolicy for backpressure under load.
123+
int numListenerThreads = ConfigUtils.getInt(config,
124+
ServiceConfigKeys.NUM_SPEC_CATALOG_LISTENER_THREADS_KEY,
125+
ServiceConfigKeys.DEFAULT_NUM_SPEC_CATALOG_LISTENER_THREADS);
126+
ExecutorService listenerExecutor = new ThreadPoolExecutor(numListenerThreads, numListenerThreads,
127+
0L, TimeUnit.MILLISECONDS,
128+
new LinkedBlockingQueue<>(numListenerThreads * 10),
129+
ExecutorsUtils.newThreadFactory(log, Optional.of("SpecCatalogListenerThread-%d")),
130+
new ThreadPoolExecutor.CallerRunsPolicy());
131+
this.listeners = new SpecCatalogListenersList(log, listenerExecutor);
114132
if (instrumentationEnabled) {
115133
MetricContext realParentCtx =
116134
parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass()));

gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,19 @@ public SpecCatalogListenersList(Optional<Logger> log) {
4747
_disp = new CallbacksDispatcher<>(Optional.<ExecutorService>absent(), log);
4848
}
4949

50+
/**
51+
* Constructor that accepts a custom {@link ExecutorService} for dispatching listener callbacks.
52+
* This enables parallel flow compilation on the submission path. Previously, onAddSpec() was
53+
* synchronized and used a single-thread executor, serializing all flow compilations. Since the
54+
* execution path ({@link org.apache.gobblin.service.modules.orchestration.DagProcessingEngine})
55+
* already runs compileFlow() from multiple threads, it is safe to parallelize here as well.
56+
* Note: onDeleteSpec and onUpdateSpec remain synchronized as they handle infrequent TopologySpec
57+
* mutations that modify shared state.
58+
*/
59+
public SpecCatalogListenersList(Optional<Logger> log, ExecutorService executorService) {
60+
_disp = new CallbacksDispatcher<>(Optional.of(executorService), log);
61+
}
62+
5063
public Logger getLog() {
5164
return _disp.getLog();
5265
}
@@ -66,7 +79,7 @@ public synchronized void removeListener(SpecCatalogListener oldListener) {
6679
}
6780

6881
@Override
69-
public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
82+
public AddSpecResponse onAddSpec(Spec addedSpec) {
7083
Preconditions.checkNotNull(addedSpec);
7184
try {
7285
return new AddSpecResponse<>(_disp.execCallbacks(new AddSpecCallback(addedSpec)));

0 commit comments

Comments
 (0)