Skip to content

Commit e4f1d84

Browse files
authored
Merge pull request #38168 from acrites/cherrypick_pr_37954
Cherry-pick PR #37954 into 2.73 release
2 parents c6bfa84 + 230fa1e commit e4f1d84

17 files changed

Lines changed: 42 additions & 10 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run!",
3-
"modification": 6,
3+
"modification": 7,
44
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run!",
3-
"modification": 1,
3+
"modification": 2,
44
}

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,8 +491,6 @@ task validatesRunnerStreaming {
491491
description "Validates Dataflow runner forcing streaming mode"
492492
dependsOn(createLegacyWorkerValidatesRunnerTest(validatesRunnerStreamingConfig + [
493493
name: 'validatesRunnerLegacyWorkerTestStreaming',
494-
// Streaming appliance currently fails bundle finalizer tests.
495-
excludedCategories: validatesRunnerStreamingConfig.excludedCategories + [ 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ],
496494
]))
497495
}
498496

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,12 +401,14 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
401401
watermarks,
402402
processingContext,
403403
drainMode,
404+
appliedFinalizeIds,
404405
getWorkStreamLatencies) ->
405406
checkNotNull(computationStateCache)
406407
.get(processingContext.computationId())
407408
.ifPresent(
408409
computationState -> {
409410
memoryMonitor.waitForResources("GetWork");
411+
streamingWorkScheduler.queueAppliedFinalizeIds(appliedFinalizeIds);
410412
streamingWorkScheduler.scheduleWork(
411413
computationState,
412414
workItem,

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,12 +158,16 @@ private void streamingEngineDispatchLoop(
158158
drainMode,
159159
workItem,
160160
serializedWorkItemSize,
161+
appliedFinalizeIds,
161162
getWorkStreamLatencies) ->
162163
computationStateFetcher
163164
.apply(computationId)
164165
.ifPresent(
165166
computationState -> {
166167
waitForResources.run();
168+
if (!appliedFinalizeIds.isEmpty()) {
169+
streamingWorkScheduler.queueAppliedFinalizeIds(appliedFinalizeIds);
170+
}
167171
streamingWorkScheduler.scheduleWork(
168172
computationState,
169173
workItem,
@@ -214,6 +218,12 @@ private void applianceDispatchLoop(Supplier<Windmill.GetWorkResponse> getWorkFn)
214218
sleepUninterruptibly(backoff, TimeUnit.MILLISECONDS);
215219
backoff = Math.min(1000, backoff * 2);
216220
} while (isRunning.get());
221+
ImmutableList<Long> appliedFinalizeIds =
222+
ImmutableList.copyOf(
223+
Preconditions.checkNotNull(workResponse).getAppliedFinalizeIdsList());
224+
if (!appliedFinalizeIds.isEmpty()) {
225+
streamingWorkScheduler.queueAppliedFinalizeIds(appliedFinalizeIds);
226+
}
217227
for (Windmill.ComputationWorkItems computationWork :
218228
Preconditions.checkNotNull(workResponse).getWorkList()) {
219229
String computationId = computationWork.getComputationId();

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,12 @@ List<AssembledWorkItem> append(Windmill.StreamingGetWorkResponseChunk chunk) {
100100
private Optional<AssembledWorkItem> flushToWorkItem() {
101101
try {
102102
workItemBuilder.mergeFrom(data);
103-
workItemBuilder.addAllAppliedFinalizeIds(appliedFinalizeIds);
104103
return Optional.of(
105104
AssembledWorkItem.create(
106105
workItemBuilder.build(),
107106
Preconditions.checkNotNull(metadata),
108107
workTimingInfosTracker.getLatencyAttributions(),
108+
ImmutableList.copyOf(appliedFinalizeIds),
109109
bufferedSize));
110110
} catch (IOException e) {
111111
LOG.error("Failed to parse work item from stream: ", e);
@@ -149,9 +149,10 @@ private static AssembledWorkItem create(
149149
WorkItem workItem,
150150
ComputationMetadata computationMetadata,
151151
ImmutableList<LatencyAttribution> latencyAttributions,
152+
ImmutableList<Long> appliedFinalizeIds,
152153
long size) {
153154
return new AutoValue_GetWorkResponseChunkAssembler_AssembledWorkItem(
154-
workItem, computationMetadata, latencyAttributions, size);
155+
workItem, computationMetadata, latencyAttributions, appliedFinalizeIds, size);
155156
}
156157

157158
abstract WorkItem workItem();
@@ -160,6 +161,8 @@ private static AssembledWorkItem create(
160161

161162
abstract ImmutableList<LatencyAttribution> latencyAttributions();
162163

164+
abstract ImmutableList<Long> appliedFinalizeIds();
165+
163166
abstract long bufferedSize();
164167
}
165168
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) {
282282
createWatermarks(workItem, metadata),
283283
createProcessingContext(metadata.computationId()),
284284
metadata.drainMode(),
285+
assembledWorkItem.appliedFinalizeIds(),
285286
assembledWorkItem.latencyAttributions());
286287
budgetTracker.recordBudgetReceived(assembledWorkItem.bufferedSize());
287288
GetWorkBudget extension = budgetTracker.computeBudgetExtension();

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) {
206206
assembledWorkItem.computationMetadata().drainMode(),
207207
assembledWorkItem.workItem(),
208208
assembledWorkItem.bufferedSize(),
209+
assembledWorkItem.appliedFinalizeIds(),
209210
assembledWorkItem.latencyAttributions());
210211

211212
// Record the fact that there are now fewer outstanding messages and bytes on the stream.

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,6 @@ void receiveWork(
3333
boolean drainMode,
3434
Windmill.WorkItem workItem,
3535
long serializedWorkItemSize,
36+
ImmutableList<Long> appliedFinalizeIds,
3637
ImmutableList<LatencyAttribution> getWorkStreamLatencies);
3738
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public interface WorkItemScheduler {
3636
* @param watermarks processing watermarks for the workItem.
3737
* @param processingContext for processing the workItem.
3838
* @param drainMode is job is draining.
39+
* @param appliedFinalizeIds Any applied finalize ids that should have their callbacks run.
3940
* @param getWorkStreamLatencies Latencies per processing stage for the WorkItem for reporting
4041
* back to Streaming Engine backend.
4142
*/
@@ -45,5 +46,6 @@ void scheduleWork(
4546
Watermarks watermarks,
4647
Work.ProcessingContext processingContext,
4748
boolean drainMode,
49+
ImmutableList<Long> appliedFinalizeIds,
4850
ImmutableList<LatencyAttribution> getWorkStreamLatencies);
4951
}

0 commit comments

Comments
 (0)