From 333e5b0d93a6a3e88e6592c045dd36993ec3180e Mon Sep 17 00:00:00 2001 From: aviruthen <91846056+aviruthen@users.noreply.github.com> Date: Tue, 7 Apr 2026 15:33:40 -0400 Subject: [PATCH 1/2] fix: `ProcessingS3Output`'s `s3_uri` to be an optional field (5559) --- .../src/sagemaker/core/processing.py | 49 ++++++- sagemaker-core/tests/unit/test_processing.py | 137 ++++++++++++++++++ 2 files changed, 183 insertions(+), 3 deletions(-) diff --git a/sagemaker-core/src/sagemaker/core/processing.py b/sagemaker-core/src/sagemaker/core/processing.py index b507ae1a93..ee0d2a83ec 100644 --- a/sagemaker-core/src/sagemaker/core/processing.py +++ b/sagemaker-core/src/sagemaker/core/processing.py @@ -483,7 +483,48 @@ def _normalize_outputs(self, outputs=None): # Generate a name for the ProcessingOutput if it doesn't have one. if output.output_name is None: output.output_name = "output-{}".format(count) - if output.s3_output and is_pipeline_variable(output.s3_output.s3_uri): + if output.s3_output and output.s3_output.s3_uri is not None and is_pipeline_variable(output.s3_output.s3_uri): + normalized_outputs.append(output) + continue + # If s3_output is None or s3_uri is None, auto-generate an S3 URI + if not output.s3_output or output.s3_output.s3_uri is None: + if _pipeline_config: + s3_uri = Join( + on="/", + values=[ + "s3:/", + self.sagemaker_session.default_bucket(), + *( + # don't include default_bucket_prefix if it is None or "" + [self.sagemaker_session.default_bucket_prefix] + if self.sagemaker_session.default_bucket_prefix + else [] + ), + _pipeline_config.pipeline_name, + ExecutionVariables.PIPELINE_EXECUTION_ID, + _pipeline_config.step_name, + "output", + output.output_name, + ], + ) + else: + s3_uri = s3.s3_path_join( + "s3://", + self.sagemaker_session.default_bucket(), + self.sagemaker_session.default_bucket_prefix, + self._current_job_name, + "output", + output.output_name, + ) + if output.s3_output: + output.s3_output.s3_uri = s3_uri + else: + from sagemaker.core.shapes import ProcessingS3Output as _ProcessingS3Output + output.s3_output = _ProcessingS3Output( + s3_uri=s3_uri, + local_path=output.s3_output.local_path if output.s3_output else "/opt/ml/processing/output", + s3_upload_mode="EndOfJob", + ) normalized_outputs.append(output) continue # If the output's s3_uri is not an s3_uri, create one. @@ -1421,11 +1462,13 @@ def _processing_output_to_request_dict(processing_output): } if processing_output.s3_output: - request_dict["S3Output"] = { - "S3Uri": processing_output.s3_output.s3_uri, + s3_output_dict = { "LocalPath": processing_output.s3_output.local_path, "S3UploadMode": processing_output.s3_output.s3_upload_mode, } + if processing_output.s3_output.s3_uri is not None: + s3_output_dict["S3Uri"] = processing_output.s3_output.s3_uri + request_dict["S3Output"] = s3_output_dict return request_dict diff --git a/sagemaker-core/tests/unit/test_processing.py b/sagemaker-core/tests/unit/test_processing.py index dbe8d5f9ef..158f4c63ba 100644 --- a/sagemaker-core/tests/unit/test_processing.py +++ b/sagemaker-core/tests/unit/test_processing.py @@ -1385,6 +1385,143 @@ def test_start_new_removes_tags_from_processing_job(self, mock_session): assert "tags" not in call_kwargs +class TestProcessingS3OutputOptionalS3Uri: + """Tests for ProcessingS3Output with optional s3_uri (issue #5559).""" + + def test_processing_s3_output_with_none_s3_uri_is_valid(self): + """Verify ProcessingS3Output can be instantiated with s3_uri=None.""" + s3_output = ProcessingS3Output( + s3_uri=None, + local_path="/opt/ml/processing/output", + s3_upload_mode="EndOfJob", + ) + assert s3_output.s3_uri is None + assert s3_output.local_path == "/opt/ml/processing/output" + assert s3_output.s3_upload_mode == "EndOfJob" + + def test_processing_s3_output_without_s3_uri_kwarg_is_valid(self): + """Verify ProcessingS3Output can be instantiated without passing s3_uri at all.""" + s3_output = ProcessingS3Output( + local_path="/opt/ml/processing/output", + s3_upload_mode="EndOfJob", + ) + assert s3_output.s3_uri is None + + def test_normalize_outputs_with_none_s3_uri_generates_s3_path(self, mock_session): + """When s3_uri is None, _normalize_outputs should auto-generate an S3 path.""" + processor = Processor( + role="arn:aws:iam::123456789012:role/SageMakerRole", + image_uri="test-image:latest", + instance_count=1, + instance_type="ml.m5.xlarge", + sagemaker_session=mock_session, + ) + processor._current_job_name = "test-job" + + s3_output = ProcessingS3Output( + s3_uri=None, + local_path="/opt/ml/processing/output", + s3_upload_mode="EndOfJob", + ) + outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)] + + with patch("sagemaker.core.workflow.utilities._pipeline_config", None): + result = processor._normalize_outputs(outputs) + + assert len(result) == 1 + generated_uri = result[0].s3_output.s3_uri + assert generated_uri.startswith("s3://") + assert "test-job" in generated_uri + assert "my-output" in generated_uri + + def test_normalize_outputs_with_none_s3_uri_and_pipeline_config(self, mock_session): + """When s3_uri is None and pipeline_config is set, use pipeline-based path.""" + processor = Processor( + role="arn:aws:iam::123456789012:role/SageMakerRole", + image_uri="test-image:latest", + instance_count=1, + instance_type="ml.m5.xlarge", + sagemaker_session=mock_session, + ) + processor._current_job_name = "test-job" + + s3_output = ProcessingS3Output( + s3_uri=None, + local_path="/opt/ml/processing/output", + s3_upload_mode="EndOfJob", + ) + outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)] + + with patch("sagemaker.core.workflow.utilities._pipeline_config") as mock_config: + mock_config.pipeline_name = "test-pipeline" + mock_config.step_name = "test-step" + result = processor._normalize_outputs(outputs) + + assert len(result) == 1 + # The result should be a Join object (pipeline variable) when pipeline_config is set + assert result[0].s3_output.s3_uri is not None + + def test_normalize_outputs_with_none_s3_uri_auto_generates_name(self, mock_session): + """When output_name is None and s3_uri is None, both should be auto-generated.""" + processor = Processor( + role="arn:aws:iam::123456789012:role/SageMakerRole", + image_uri="test-image:latest", + instance_count=1, + instance_type="ml.m5.xlarge", + sagemaker_session=mock_session, + ) + processor._current_job_name = "test-job" + + s3_output = ProcessingS3Output( + s3_uri=None, + local_path="/opt/ml/processing/output", + s3_upload_mode="EndOfJob", + ) + outputs = [ProcessingOutput(s3_output=s3_output)] + + with patch("sagemaker.core.workflow.utilities._pipeline_config", None): + result = processor._normalize_outputs(outputs) + + assert len(result) == 1 + assert result[0].output_name == "output-1" + generated_uri = result[0].s3_output.s3_uri + assert generated_uri.startswith("s3://") + assert "output-1" in generated_uri + + def test_processing_output_to_request_dict_omits_s3_uri_when_none(self): + """Verify _processing_output_to_request_dict omits S3Uri when s3_uri is None.""" + s3_output = ProcessingS3Output( + s3_uri=None, + local_path="/opt/ml/processing/output", + s3_upload_mode="EndOfJob", + ) + processing_output = ProcessingOutput(output_name="results", s3_output=s3_output) + + result = _processing_output_to_request_dict(processing_output) + + assert result["OutputName"] == "results" + assert "S3Output" in result + assert "S3Uri" not in result["S3Output"] + assert result["S3Output"]["LocalPath"] == "/opt/ml/processing/output" + assert result["S3Output"]["S3UploadMode"] == "EndOfJob" + + def test_processing_output_to_request_dict_includes_s3_uri_when_set(self): + """Regression test: S3Uri is included when s3_uri is provided.""" + s3_output = ProcessingS3Output( + s3_uri="s3://bucket/output", + local_path="/opt/ml/processing/output", + s3_upload_mode="EndOfJob", + ) + processing_output = ProcessingOutput(output_name="results", s3_output=s3_output) + + result = _processing_output_to_request_dict(processing_output) + + assert result["OutputName"] == "results" + assert result["S3Output"]["S3Uri"] == "s3://bucket/output" + assert result["S3Output"]["LocalPath"] == "/opt/ml/processing/output" + assert result["S3Output"]["S3UploadMode"] == "EndOfJob" + + # Additional tests from test_processing_extended.py class TestProcessorBasics: """Test cases for basic Processor functionality""" From 94e95e0216ee70b41b2049a88ca6e6fa5602f4c6 Mon Sep 17 00:00:00 2001 From: aviruthen <91846056+aviruthen@users.noreply.github.com> Date: Tue, 7 Apr 2026 19:00:45 -0400 Subject: [PATCH 2/2] fix: address review comments (iteration #1) --- .../src/sagemaker/core/processing.py | 29 +++++++++++++---- sagemaker-core/tests/unit/test_processing.py | 31 ++++++++++++++++++- 2 files changed, 53 insertions(+), 7 deletions(-) diff --git a/sagemaker-core/src/sagemaker/core/processing.py b/sagemaker-core/src/sagemaker/core/processing.py index ee0d2a83ec..572823d7c3 100644 --- a/sagemaker-core/src/sagemaker/core/processing.py +++ b/sagemaker-core/src/sagemaker/core/processing.py @@ -483,10 +483,16 @@ def _normalize_outputs(self, outputs=None): # Generate a name for the ProcessingOutput if it doesn't have one. if output.output_name is None: output.output_name = "output-{}".format(count) - if output.s3_output and output.s3_output.s3_uri is not None and is_pipeline_variable(output.s3_output.s3_uri): + if ( + output.s3_output + and output.s3_output.s3_uri is not None + and is_pipeline_variable(output.s3_output.s3_uri) + ): normalized_outputs.append(output) continue - # If s3_output is None or s3_uri is None, auto-generate an S3 URI + # If s3_output is None or s3_uri is None, auto-generate + # an S3 URI (V2 parity: destination=None delegates to + # SageMaker). if not output.s3_output or output.s3_output.s3_uri is None: if _pipeline_config: s3_uri = Join( @@ -495,7 +501,6 @@ def _normalize_outputs(self, outputs=None): "s3:/", self.sagemaker_session.default_bucket(), *( - # don't include default_bucket_prefix if it is None or "" [self.sagemaker_session.default_bucket_prefix] if self.sagemaker_session.default_bucket_prefix else [] @@ -517,12 +522,19 @@ def _normalize_outputs(self, outputs=None): output.output_name, ) if output.s3_output: + # s3_output exists but s3_uri is None output.s3_output.s3_uri = s3_uri else: - from sagemaker.core.shapes import ProcessingS3Output as _ProcessingS3Output + # s3_output is None — create a new one with + # sensible defaults. + # Import here to avoid circular import with + # shapes module. + from sagemaker.core.shapes import ( + ProcessingS3Output as _ProcessingS3Output, + ) output.s3_output = _ProcessingS3Output( s3_uri=s3_uri, - local_path=output.s3_output.local_path if output.s3_output else "/opt/ml/processing/output", + local_path="/opt/ml/processing/output", s3_upload_mode="EndOfJob", ) normalized_outputs.append(output) @@ -530,7 +542,12 @@ def _normalize_outputs(self, outputs=None): # If the output's s3_uri is not an s3_uri, create one. parse_result = urlparse(output.s3_output.s3_uri) if parse_result.scheme != "s3": - if getattr(self.sagemaker_session, "local_mode", False) and parse_result.scheme == "file": + if ( + getattr( + self.sagemaker_session, "local_mode", False + ) + and parse_result.scheme == "file" + ): normalized_outputs.append(output) continue if _pipeline_config: diff --git a/sagemaker-core/tests/unit/test_processing.py b/sagemaker-core/tests/unit/test_processing.py index 158f4c63ba..fad1245ff1 100644 --- a/sagemaker-core/tests/unit/test_processing.py +++ b/sagemaker-core/tests/unit/test_processing.py @@ -1436,6 +1436,8 @@ def test_normalize_outputs_with_none_s3_uri_generates_s3_path(self, mock_session def test_normalize_outputs_with_none_s3_uri_and_pipeline_config(self, mock_session): """When s3_uri is None and pipeline_config is set, use pipeline-based path.""" + from sagemaker.core.workflow.functions import Join + processor = Processor( role="arn:aws:iam::123456789012:role/SageMakerRole", image_uri="test-image:latest", @@ -1459,7 +1461,11 @@ def test_normalize_outputs_with_none_s3_uri_and_pipeline_config(self, mock_sessi assert len(result) == 1 # The result should be a Join object (pipeline variable) when pipeline_config is set - assert result[0].s3_output.s3_uri is not None + assert isinstance(result[0].s3_output.s3_uri, Join) + # Verify the Join contains expected pipeline-related values + join_obj = result[0].s3_output.s3_uri + assert join_obj.on == "/" + assert "test-pipeline" in join_obj.values def test_normalize_outputs_with_none_s3_uri_auto_generates_name(self, mock_session): """When output_name is None and s3_uri is None, both should be auto-generated.""" @@ -1488,6 +1494,29 @@ def test_normalize_outputs_with_none_s3_uri_auto_generates_name(self, mock_sessi assert generated_uri.startswith("s3://") assert "output-1" in generated_uri + def test_normalize_outputs_with_no_s3_output_at_all(self, mock_session): + """When s3_output is None entirely, a new ProcessingS3Output is created.""" + processor = Processor( + role="arn:aws:iam::123456789012:role/SageMakerRole", + image_uri="test-image:latest", + instance_count=1, + instance_type="ml.m5.xlarge", + sagemaker_session=mock_session, + ) + processor._current_job_name = "test-job" + + outputs = [ProcessingOutput(output_name="my-output")] + + with patch("sagemaker.core.workflow.utilities._pipeline_config", None): + result = processor._normalize_outputs(outputs) + + assert len(result) == 1 + assert result[0].s3_output is not None + assert result[0].s3_output.s3_uri.startswith("s3://") + assert result[0].s3_output.local_path == "/opt/ml/processing/output" + assert result[0].s3_output.s3_upload_mode == "EndOfJob" + assert "my-output" in result[0].s3_output.s3_uri + def test_processing_output_to_request_dict_omits_s3_uri_when_none(self): """Verify _processing_output_to_request_dict omits S3Uri when s3_uri is None.""" s3_output = ProcessingS3Output(