From aefbc1912019d98eae8cd64b9df785ac38085008 Mon Sep 17 00:00:00 2001 From: bot_apk Date: Sat, 14 Mar 2026 21:20:18 +0000 Subject: [PATCH 1/4] fix: handle non-JSON-serializable types in serialization fallback The json.dumps() fallback in airbyte_message_to_string() could also fail for types that neither orjson nor stdlib json can serialize (e.g. complex numbers), causing an unhandled exception that leads to deadlocks in the concurrent source pipeline. Add a second fallback using json.dumps(default=str) to ensure serialization never raises an unhandled exception. Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- airbyte_cdk/entrypoint.py | 5 ++++- unit_tests/test_entrypoint.py | 24 ++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index d6c92f9fa..a0d01d22f 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -342,7 +342,10 @@ def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str: f"There was an error during the serialization of an AirbyteMessage: `{exception}`. This might impact the sync performances." ) _HAS_LOGGED_FOR_SERIALIZATION_ERROR = True - return json.dumps(serialized_message) + try: + return json.dumps(serialized_message) + except Exception: + return json.dumps(serialized_message, default=str) @classmethod def extract_state(cls, args: List[str]) -> Optional[Any]: diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 520131881..f411594b4 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -856,3 +856,27 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json( # There will be multiple messages here because the fixture `entrypoint` sets a control message. We only care about records here record_messages = list(filter(lambda message: "RECORD" in message, messages)) assert len(record_messages) == 2 + + +def test_given_non_json_serializable_type_then_fallback_with_default_str( + entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock +): + """Test that types which both orjson and json cannot serialize (like complex) are handled via default=str fallback.""" + parsed_args = Namespace( + command="read", config="config_path", state="statepath", catalog="catalogpath" + ) + record = AirbyteMessage( + record=AirbyteRecordMessage( + stream="stream", data={"value": complex(1, 2)}, emitted_at=1 + ), + type=Type.RECORD, + ) + mocker.patch.object(MockSource, "read_state", return_value={}) + mocker.patch.object(MockSource, "read_catalog", return_value={}) + mocker.patch.object(MockSource, "read", return_value=[record]) + + messages = list(entrypoint.run(parsed_args)) + + record_messages = list(filter(lambda message: "RECORD" in message, messages)) + assert len(record_messages) == 1 + assert "(1+2j)" in record_messages[0] From 83205007c003c4fd7335ca25751d90eac0647af2 Mon Sep 17 00:00:00 2001 From: bot_apk Date: Sat, 14 Mar 2026 21:43:56 +0000 Subject: [PATCH 2/4] style: fix ruff format for test_entrypoint.py Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- unit_tests/test_entrypoint.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index f411594b4..63c9e284e 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -866,9 +866,7 @@ def test_given_non_json_serializable_type_then_fallback_with_default_str( command="read", config="config_path", state="statepath", catalog="catalogpath" ) record = AirbyteMessage( - record=AirbyteRecordMessage( - stream="stream", data={"value": complex(1, 2)}, emitted_at=1 - ), + record=AirbyteRecordMessage(stream="stream", data={"value": complex(1, 2)}, emitted_at=1), type=Type.RECORD, ) mocker.patch.object(MockSource, "read_state", return_value={}) From 2e3508d1ed7eecf9aa20d556f3ce49e0c91e1fff Mon Sep 17 00:00:00 2001 From: bot_apk Date: Thu, 26 Mar 2026 18:43:09 +0000 Subject: [PATCH 3/4] refactor: replace default=str fallback with AirbyteTracedException per review Fail fast with a clear error instead of silently corrupting data by converting non-serializable types to strings. Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- airbyte_cdk/entrypoint.py | 8 ++++++-- unit_tests/test_entrypoint.py | 11 ++++------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index a0d01d22f..246bf47ad 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -344,8 +344,12 @@ def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str: _HAS_LOGGED_FOR_SERIALIZATION_ERROR = True try: return json.dumps(serialized_message) - except Exception: - return json.dumps(serialized_message, default=str) + except Exception as json_exception: + raise AirbyteTracedException( + internal_message=f"Failed to serialize AirbyteMessage to JSON: `{json_exception}`", + failure_type=FailureType.system_error, + message=f"A record could not be serialized to JSON: `{json_exception}`. The sync will be stopped to prevent data corruption. Please verify the source is returning valid data types.", + ) from json_exception @classmethod def extract_state(cls, args: List[str]) -> Optional[Any]: diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 63c9e284e..f523f305e 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -858,10 +858,10 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json( assert len(record_messages) == 2 -def test_given_non_json_serializable_type_then_fallback_with_default_str( +def test_given_non_json_serializable_type_then_raise_traced_exception( entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock ): - """Test that types which both orjson and json cannot serialize (like complex) are handled via default=str fallback.""" + """Test that types which both orjson and json cannot serialize (like complex) raise AirbyteTracedException to prevent data corruption.""" parsed_args = Namespace( command="read", config="config_path", state="statepath", catalog="catalogpath" ) @@ -873,8 +873,5 @@ def test_given_non_json_serializable_type_then_fallback_with_default_str( mocker.patch.object(MockSource, "read_catalog", return_value={}) mocker.patch.object(MockSource, "read", return_value=[record]) - messages = list(entrypoint.run(parsed_args)) - - record_messages = list(filter(lambda message: "RECORD" in message, messages)) - assert len(record_messages) == 1 - assert "(1+2j)" in record_messages[0] + with pytest.raises(AirbyteTracedException, match="could not be serialized to JSON"): + list(entrypoint.run(parsed_args)) From e3973ed4f8543fc4a535fb23be35ffd3ca50182d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 21:08:21 +0000 Subject: [PATCH 4/4] fix: simplify external error message per review feedback Co-Authored-By: suisui.xia --- airbyte_cdk/entrypoint.py | 2 +- unit_tests/test_entrypoint.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 246bf47ad..1df36fed5 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -348,7 +348,7 @@ def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str: raise AirbyteTracedException( internal_message=f"Failed to serialize AirbyteMessage to JSON: `{json_exception}`", failure_type=FailureType.system_error, - message=f"A record could not be serialized to JSON: `{json_exception}`. The sync will be stopped to prevent data corruption. Please verify the source is returning valid data types.", + message="A record returned from the API failed to be serialized to JSON.", ) from json_exception @classmethod diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index f523f305e..f4e56f936 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -873,5 +873,5 @@ def test_given_non_json_serializable_type_then_raise_traced_exception( mocker.patch.object(MockSource, "read_catalog", return_value={}) mocker.patch.object(MockSource, "read", return_value=[record]) - with pytest.raises(AirbyteTracedException, match="could not be serialized to JSON"): + with pytest.raises(AirbyteTracedException, match="failed to be serialized to JSON"): list(entrypoint.run(parsed_args))