diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index e968911fcca1..1519cabf673e 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -335,7 +335,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions experiments := jobopts.GetExperiments() // Ensure that we enable the same set of experiments across all SDKs // for runner v2. - var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool + var fnApiSet, v2set, uwSet, portableRunnerSet, portaSubmission, seSet, wsSet bool for _, e := range experiments { if strings.Contains(e, "beam_fn_api") { fnApiSet = true @@ -349,7 +349,10 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if strings.Contains(e, "use_portable_job_submission") { portaSubmission = true } - if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") { + if strings.Contains(e, "enable_portable_runner") { + portableRunnerSet = true + } + if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") || strings.Contains(e, "disable_portable_runner") || strings.Contains(e, "enable_streaming_java_runner") { return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+") } } @@ -366,6 +369,9 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if !portaSubmission { experiments = append(experiments, "use_portable_job_submission") } + if !portableRunnerSet { + // As this option is not documented, we do not set it by default. This behavior will be fixed in later versions. + } // Ensure that streaming specific experiments are set for streaming pipelines // since runner v2 only supports using streaming engine. diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go index 23dcd034120a..529ae5c087cd 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go @@ -244,6 +244,40 @@ func TestGetJobOptions_DisableRunnerV2ExperimentsSet(t *testing.T) { } } +func TestGetJobOptions_DisablePortableRunnerExperimentsSet(t *testing.T) { + resetGlobals() + *stagingLocation = "gs://testStagingLocation" + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + *jobopts.Experiments = "disable_portable_runner" + + opts, err := getJobOptions(context.Background(), false) + + if err == nil { + t.Error("getJobOptions() returned error nil, want an error") + } + if opts != nil { + t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) + } +} + +func TestGetJobOptions_EnableStreamingJavaRunnerExperimentsSet(t *testing.T) { + resetGlobals() + *stagingLocation = "gs://testStagingLocation" + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + *jobopts.Experiments = "enable_streaming_java_runner" + + opts, err := getJobOptions(context.Background(), false) + + if err == nil { + t.Error("getJobOptions() returned error nil, want an error") + } + if opts != nil { + t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) + } +} + func TestGetJobOptions_NoStagingLocation(t *testing.T) { resetGlobals() *stagingLocation = "" diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 0c23e6024dc6..58da4461593c 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -591,6 +591,8 @@ def _add_runner_v2_missing_options(options): debug_options.add_experiment('use_unified_worker') debug_options.add_experiment('use_runner_v2') debug_options.add_experiment('use_portable_job_submission') + # enable_portable_runner is not added by default as it is not documented. + # This behavior will be fixed in later versions. def _check_and_add_missing_options(options): @@ -662,6 +664,8 @@ def _is_runner_v2_disabled(options): """Returns true if runner v2 is disabled.""" debug_options = options.view_as(DebugOptions) return ( + debug_options.lookup_experiment('disable_portable_runner') or + debug_options.lookup_experiment('enable_streaming_java_runner') or debug_options.lookup_experiment('disable_runner_v2') or debug_options.lookup_experiment('disable_runner_v2_until_2023') or debug_options.lookup_experiment('disable_runner_v2_until_v2.50') or