fix: fail fast on non-JSON-serializable types in serialization fallback (AI-Triage PR)#954
Conversation
The json.dumps() fallback in airbyte_message_to_string() could also fail for types that neither orjson nor stdlib json can serialize (e.g. complex numbers), causing an unhandled exception that leads to deadlocks in the concurrent source pipeline. Add a second fallback using json.dumps(default=str) to ensure serialization never raises an unhandled exception. Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1773523110-fix-serialization-fallback-complex-types#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1773523110-fix-serialization-fallback-complex-typesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR hardens AirbyteEntrypoint.airbyte_message_to_string() against record payloads containing Python types that cannot be serialized by either orjson or stdlib json, to avoid worker-thread crashes that can lead to deadlocks in concurrent pipelines.
Changes:
- Add a second serialization fallback: if
json.dumps()fails, retry withjson.dumps(..., default=str). - Add a unit test covering a
complexvalue that triggers the new fallback.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
airbyte_cdk/entrypoint.py |
Adds a nested JSON serialization fallback using default=str when both orjson and stdlib json fail. |
unit_tests/test_entrypoint.py |
Adds coverage for the new fallback path using a complex value in record data. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
airbyte_cdk/entrypoint.py
Outdated
| try: | ||
| return json.dumps(serialized_message) | ||
| except Exception: | ||
| return json.dumps(serialized_message, default=str) |
There was a problem hiding this comment.
json.dumps(serialized_message, default=str) is treated as a last-resort, but it can still raise (e.g., circular references in dict/list raise ValueError: Circular reference detected, and some key-type errors aren’t handled by default). In those cases airbyte_message_to_string() will still throw and the concurrent worker can still die, so this doesn’t fully address the deadlock scenario. Consider wrapping the default=str call in a final try/except and emitting a protocol-safe fallback (e.g., a TRACE/LOG message that drops the offending record) rather than letting the exception propagate.
| return json.dumps(serialized_message, default=str) | |
| try: | |
| return json.dumps(serialized_message, default=str) | |
| except Exception as json_fallback_exception: | |
| logger.error( | |
| "Failed to serialize AirbyteMessage with all serializers. " | |
| "Dropping message. orjson/json errors: %r, %r", | |
| exception, | |
| json_fallback_exception, | |
| ) | |
| # Return a protocol-safe fallback LOG message indicating the dropped record. | |
| return ( | |
| '{"type": "LOG", "log": {' | |
| '"level": "ERROR", ' | |
| '"message": "Dropping message due to serialization error in airbyte_message_to_string."' | |
| "}}" | |
| ) |
There was a problem hiding this comment.
Good point about the theoretical edge case with circular references. However, I think silently dropping records and replacing them with LOG messages is a riskier behavior change than letting the exception propagate — it could cause silent data loss that's harder to detect than a loud failure.
The current fix targets the specific reported issue (complex types from Jinja interpolation). Circular references in AirbyteMessage data are extremely unlikely since the data originates from JSON API responses.
That said, if a human reviewer wants the extra defensive layer, I'm happy to add it. Deferring this design decision to the team.
There was a problem hiding this comment.
Using default=str silently converts non-serializable types (e.g. complex(0.0423, 0) → "(0.0423+0j)") to strings, which corrupts the record — a field the destination expects as a number becomes a string. Instead of silently producing bad data, we should raise an AirbyteTracedException so the sync fails fast with a clear error message rather than writing corrupted records downstream.
try:
return json.dumps(serialized_message)
except Exception as json_exception:
raise AirbyteTracedException(
internal_message=f"Failed to serialize AirbyteMessage to JSON: `{json_exception}`",
failure_type=FailureType.system_error,
message=f"A record could not be serialized to JSON: `{json_exception}`. The sync will be stopped to prevent data corruption. Please verify the source is returning valid data types.",
) from json_exception
There was a problem hiding this comment.
Good call — updated in 2e3508d. Now raises AirbyteTracedException(failure_type=system_error) instead of silently corrupting data with default=str. The sync will fail fast with a clear message pointing at the non-serializable type. Test updated to match.
unit_tests/test_entrypoint.py
Outdated
|
|
||
| record_messages = list(filter(lambda message: "RECORD" in message, messages)) | ||
| assert len(record_messages) == 1 | ||
| assert "(1+2j)" in record_messages[0] |
There was a problem hiding this comment.
This test asserts via a substring match on the raw serialized message. Parsing the output JSON and asserting record.data.value == "(1+2j)" would be more robust and makes the intent explicit (avoids coupling to serialization formatting).
| assert "(1+2j)" in record_messages[0] | |
| parsed_record = orjson.loads(record_messages[0]) | |
| assert parsed_record["record"]["data"]["value"] == "(1+2j)" |
There was a problem hiding this comment.
Agreed — parsing the JSON and asserting on the specific field is more robust. However, I note the existing test (test_given_serialization_error_using_orjson_then_fallback_on_json) at line 856 uses the same substring-match pattern ("RECORD" in message), so this test is consistent with the existing style.
Happy to update if a human reviewer prefers the parsed approach.
Suggestion: Pair this with a connector-level fix in source-google-search-consoleThis CDK-level fix prevents the deadlock, which is good — but the I'd recommend pairing this with a defensive transformation in the connector itself ( @dataclass
class SanitizeNumericFields(RecordTransformation):
"""
Ensures numeric metric fields are JSON-serializable by guarding against non-standard
Python numeric types (e.g., complex) that can cause serialization failures and deadlocks
in the concurrent read pipeline.
See: https://github.com/airbytehq/airbyte/issues/74883
"""
NUMERIC_FIELDS = ("clicks", "impressions", "ctr", "position")
def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
for field in self.NUMERIC_FIELDS:
value = record.get(field)
if value is not None and isinstance(value, complex):
record[field] = value.realThen wire it into all search analytics streams in - type: CustomTransformation
class_name: source_declarative_manifest.components.SanitizeNumericFieldsWhy both fixes?
|
|
Great suggestion — you're right that the I looked at the connector and can confirm:
Want me to create a companion Draft PR in |
Updated suggestion: SanitizeNumericFields with warning loggingBuilding on my previous comment — the connector-level transformation should also log a warning when a complex value is encountered, so the data issue is visible rather than silently discarded: @dataclass
class SanitizeNumericFields(RecordTransformation):
"""
Ensures numeric metric fields are JSON-serializable by guarding against non-standard
Python numeric types (e.g., complex) that can cause serialization failures and deadlocks
in the concurrent read pipeline.
See: https://github.com/airbytehq/airbyte/issues/74883
"""
NUMERIC_FIELDS = ("clicks", "impressions", "ctr", "position")
def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
for field in self.NUMERIC_FIELDS:
value = record.get(field)
if value is not None and isinstance(value, complex):
logger.warning(
f"Complex value encountered for field '{field}': {value}. "
f"Using real component only."
)
record[field] = value.realWhy this is preferable to the
|
|
Thanks for the refined suggestion with the warning logging — that's a good addition. I'll create a companion Draft PR in |
|
Created the companion connector-level Draft PR per Patrick Nilan (@pnilan)'s suggestion: airbytehq/airbyte#75426 This adds a |
…r review Fail fast with a clear error instead of silently corrupting data by converting non-serializable types to strings. Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-Authored-By: suisui.xia <suisui.xia@airbyte.io>
Patrick Nilan (pnilan)
left a comment
There was a problem hiding this comment.
Approvingh
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1773523110-fix-serialization-fallback-complex-types#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1773523110-fix-serialization-fallback-complex-typesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Summary
Hardens
airbyte_message_to_string()to raise a clearAirbyteTracedExceptionwhen record data contains types that neitherorjsonnor stdlibjsoncan serialize, instead of letting an unhandledTypeErrorsilently kill worker threads and deadlock the concurrent source pipeline.Problem: The existing fallback chain is
orjson.dumps()→json.dumps(). Butjson.dumps()can also raiseTypeErrorfor types like Pythoncomplexnumbers. When this unhandled exception occurs in a worker thread of the concurrent source pipeline, the main thread deadlocks waiting onqueue.get()because the worker silently dies.Fix: Wrap the
json.dumps()fallback in its own try/except and raise anAirbyteTracedExceptionwithfailure_type=system_errorand a clear message identifying the serialization failure. This stops the sync immediately with an actionable error rather than deadlocking or silently corrupting data.Resolves https://github.com/airbytehq/oncall/issues/11654:
Related: airbytehq/airbyte#74883
Review & Testing Checklist for Human
AirbyteTracedExceptionraised fromairbyte_message_to_string()propagates correctly through the concurrent read pipeline and does not itself cause a deadlock. The function is called viamap()inrun()— confirm the exception surfaces cleanly to the platform.system_erroris the correct failure type: This will cause the platform to retry the sync. Confirm this is preferable overconfig_error(which would not retry) for non-serializable data types in records.complextype issue insource-google-search-consoleis handled at the connector level in airbytehq/airbyte#75426. This CDK change is defense-in-depth — test that the connector fix prevents the exception from firing, and that without the connector fix, this CDK change produces a clear error instead of a deadlock.Suggested test plan: Run
source-google-search-consolewith thesearch_analytics_by_querystream against a real account. With the companion connector fix, the sync should complete normally. Without it, the sync should fail with a clearAirbyteTracedExceptionmessage mentioning JSON serialization, rather than deadlocking.Notes
complexvalues entering the data flow was separately fixed in CDK PR airbytehq/airbyte-python-cdk#579 (Jinja interpolation rejecting complex types, merged in v6.54.5). This PR hardens the serialization layer as defense-in-depth against any future non-serializable type reaching this code path.orjson→jsonfallback was added in airbytehq/airbyte-python-cdk#210 (v6.16.1, by Artem Inzhyyants (@artem1205) / Maxime Carbonneau-Leclerc (@maxi297)).SanitizeNumericFieldstransformation to extract.realfrom complex values before serialization.Updates since last revision
default=strsilent fallback withAirbyteTracedExceptionper reviewer feedback from suisuixia42 — fail fast with a clear error instead of silently corrupting numeric fields into strings like"(0.0423+0j)".AirbyteTracedExceptionis raised with the expected message.Link to Devin session: https://app.devin.ai/sessions/d8317c1f4ce64f70b5e807425b72aca2