|
30 | 30 | import java.util.PriorityQueue; |
31 | 31 | import java.util.UUID; |
32 | 32 | import org.apache.beam.runners.direct.DirectRunner; |
| 33 | +import org.apache.beam.runners.flink.FlinkPipelineOptions; |
33 | 34 | import org.apache.beam.runners.flink.FlinkRunner; |
34 | 35 | import org.apache.beam.sdk.Pipeline; |
35 | 36 | import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults; |
@@ -312,6 +313,10 @@ public void testSimpleExpandWithStateStore() throws IOException { |
312 | 313 | @Test |
313 | 314 | public void testStateWithElementEarly() throws IOException { |
314 | 315 | Pipeline pipeline = createPipeline(); |
| 316 | + if (pipeline.getOptions().getRunner().equals(FlinkRunner.class)) { |
| 317 | + FlinkPipelineOptions flinkOpts = pipeline.getOptions().as(FlinkPipelineOptions.class); |
| 318 | + flinkOpts.setParallelism(5); |
| 319 | + } |
315 | 320 | Instant now = new Instant(0); |
316 | 321 | PCollection<String> inputs = |
317 | 322 | pipeline.apply( |
@@ -412,6 +417,11 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { |
412 | 417 |
|
413 | 418 | private void testTimestampInject(boolean multiOutput) throws IOException { |
414 | 419 | Pipeline pipeline = createPipeline(); |
| 420 | + if (pipeline.getOptions().getRunner().equals(FlinkRunner.class)) { |
| 421 | + // limit parallelism for Flink, in multi-core CPUs it might run out of network buffers |
| 422 | + FlinkPipelineOptions flinkOpts = pipeline.getOptions().as(FlinkPipelineOptions.class); |
| 423 | + flinkOpts.setParallelism(5); |
| 424 | + } |
415 | 425 | Instant now = new Instant(0); |
416 | 426 | PCollection<String> inputs = |
417 | 427 | pipeline.apply( |
|
0 commit comments