Skip to content

fix: fail fast on non-JSON-serializable types in serialization fallback (AI-Triage PR)#954

Open
devin-ai-integration[bot] wants to merge 6 commits intomainfrom
devin/1773523110-fix-serialization-fallback-complex-types
Open

fix: fail fast on non-JSON-serializable types in serialization fallback (AI-Triage PR)#954
devin-ai-integration[bot] wants to merge 6 commits intomainfrom
devin/1773523110-fix-serialization-fallback-complex-types

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot commented Mar 14, 2026

Summary

Hardens airbyte_message_to_string() to raise a clear AirbyteTracedException when record data contains types that neither orjson nor stdlib json can serialize, instead of letting an unhandled TypeError silently kill worker threads and deadlock the concurrent source pipeline.

Problem: The existing fallback chain is orjson.dumps()json.dumps(). But json.dumps() can also raise TypeError for types like Python complex numbers. When this unhandled exception occurs in a worker thread of the concurrent source pipeline, the main thread deadlocks waiting on queue.get() because the worker silently dies.

Fix: Wrap the json.dumps() fallback in its own try/except and raise an AirbyteTracedException with failure_type=system_error and a clear message identifying the serialization failure. This stops the sync immediately with an actionable error rather than deadlocking or silently corrupting data.

Resolves https://github.com/airbytehq/oncall/issues/11654:

Related: airbytehq/airbyte#74883

Review & Testing Checklist for Human

  • Exception propagation in concurrent pipeline: Verify that AirbyteTracedException raised from airbyte_message_to_string() propagates correctly through the concurrent read pipeline and does not itself cause a deadlock. The function is called via map() in run() — confirm the exception surfaces cleanly to the platform.
  • system_error is the correct failure type: This will cause the platform to retry the sync. Confirm this is preferable over config_error (which would not retry) for non-serializable data types in records.
  • Verify with the companion connector-level fix: The specific complex type issue in source-google-search-console is handled at the connector level in airbytehq/airbyte#75426. This CDK change is defense-in-depth — test that the connector fix prevents the exception from firing, and that without the connector fix, this CDK change produces a clear error instead of a deadlock.

Suggested test plan: Run source-google-search-console with the search_analytics_by_query stream against a real account. With the companion connector fix, the sync should complete normally. Without it, the sync should fail with a clear AirbyteTracedException message mentioning JSON serialization, rather than deadlocking.

Notes

Updates since last revision

  • Replaced default=str silent fallback with AirbyteTracedException per reviewer feedback from suisuixia42 — fail fast with a clear error instead of silently corrupting numeric fields into strings like "(0.0423+0j)".
  • Updated test to assert AirbyteTracedException is raised with the expected message.

Link to Devin session: https://app.devin.ai/sessions/d8317c1f4ce64f70b5e807425b72aca2

The json.dumps() fallback in airbyte_message_to_string() could also fail
for types that neither orjson nor stdlib json can serialize (e.g. complex
numbers), causing an unhandled exception that leads to deadlocks in the
concurrent source pipeline.

Add a second fallback using json.dumps(default=str) to ensure
serialization never raises an unhandled exception.

Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1773523110-fix-serialization-fallback-complex-types#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1773523110-fix-serialization-fallback-complex-types

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 14, 2026

PyTest Results (Fast)

3 946 tests  +1   3 935 ✅ +1   7m 31s ⏱️ -3s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 30fe861. ± Comparison against base commit 007066b.

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 14, 2026

PyTest Results (Full)

3 949 tests  +1   3 937 ✅ +1   11m 18s ⏱️ +10s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 30fe861. ± Comparison against base commit 007066b.

♻️ This comment has been updated with latest results.

Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR hardens AirbyteEntrypoint.airbyte_message_to_string() against record payloads containing Python types that cannot be serialized by either orjson or stdlib json, to avoid worker-thread crashes that can lead to deadlocks in concurrent pipelines.

Changes:

  • Add a second serialization fallback: if json.dumps() fails, retry with json.dumps(..., default=str).
  • Add a unit test covering a complex value that triggers the new fallback.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
airbyte_cdk/entrypoint.py Adds a nested JSON serialization fallback using default=str when both orjson and stdlib json fail.
unit_tests/test_entrypoint.py Adds coverage for the new fallback path using a complex value in record data.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

try:
return json.dumps(serialized_message)
except Exception:
return json.dumps(serialized_message, default=str)
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json.dumps(serialized_message, default=str) is treated as a last-resort, but it can still raise (e.g., circular references in dict/list raise ValueError: Circular reference detected, and some key-type errors aren’t handled by default). In those cases airbyte_message_to_string() will still throw and the concurrent worker can still die, so this doesn’t fully address the deadlock scenario. Consider wrapping the default=str call in a final try/except and emitting a protocol-safe fallback (e.g., a TRACE/LOG message that drops the offending record) rather than letting the exception propagate.

Suggested change
return json.dumps(serialized_message, default=str)
try:
return json.dumps(serialized_message, default=str)
except Exception as json_fallback_exception:
logger.error(
"Failed to serialize AirbyteMessage with all serializers. "
"Dropping message. orjson/json errors: %r, %r",
exception,
json_fallback_exception,
)
# Return a protocol-safe fallback LOG message indicating the dropped record.
return (
'{"type": "LOG", "log": {'
'"level": "ERROR", '
'"message": "Dropping message due to serialization error in airbyte_message_to_string."'
"}}"
)

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about the theoretical edge case with circular references. However, I think silently dropping records and replacing them with LOG messages is a riskier behavior change than letting the exception propagate — it could cause silent data loss that's harder to detect than a loud failure.

The current fix targets the specific reported issue (complex types from Jinja interpolation). Circular references in AirbyteMessage data are extremely unlikely since the data originates from JSON API responses.

That said, if a human reviewer wants the extra defensive layer, I'm happy to add it. Deferring this design decision to the team.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using default=str silently converts non-serializable types (e.g. complex(0.0423, 0) → "(0.0423+0j)") to strings, which corrupts the record — a field the destination expects as a number becomes a string. Instead of silently producing bad data, we should raise an AirbyteTracedException so the sync fails fast with a clear error message rather than writing corrupted records downstream.

 try:
     return json.dumps(serialized_message)
 except Exception as json_exception:
     raise AirbyteTracedException(
         internal_message=f"Failed to serialize AirbyteMessage to JSON: `{json_exception}`",
         failure_type=FailureType.system_error,
         message=f"A record could not be serialized to JSON: `{json_exception}`. The sync will be stopped to prevent data corruption. Please verify the source is returning valid data types.",
     ) from json_exception

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call — updated in 2e3508d. Now raises AirbyteTracedException(failure_type=system_error) instead of silently corrupting data with default=str. The sync will fail fast with a clear message pointing at the non-serializable type. Test updated to match.


record_messages = list(filter(lambda message: "RECORD" in message, messages))
assert len(record_messages) == 1
assert "(1+2j)" in record_messages[0]
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test asserts via a substring match on the raw serialized message. Parsing the output JSON and asserting record.data.value == "(1+2j)" would be more robust and makes the intent explicit (avoids coupling to serialization formatting).

Suggested change
assert "(1+2j)" in record_messages[0]
parsed_record = orjson.loads(record_messages[0])
assert parsed_record["record"]["data"]["value"] == "(1+2j)"

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — parsing the JSON and asserting on the specific field is more robust. However, I note the existing test (test_given_serialization_error_using_orjson_then_fallback_on_json) at line 856 uses the same substring-match pattern ("RECORD" in message), so this test is consistent with the existing style.

Happy to update if a human reviewer prefers the parsed approach.

@pnilan
Copy link
Copy Markdown
Contributor

Suggestion: Pair this with a connector-level fix in source-google-search-console

This CDK-level fix prevents the deadlock, which is good — but the default=str fallback will silently turn numeric values like ctr and position into string representations (e.g., "(0+0j)") in the destination. That's data corruption that's hard to detect.

I'd recommend pairing this with a defensive transformation in the connector itself (source-google-search-console/components.py) that sanitizes numeric fields before they ever reach serialization:

@dataclass
class SanitizeNumericFields(RecordTransformation):
    """
    Ensures numeric metric fields are JSON-serializable by guarding against non-standard
    Python numeric types (e.g., complex) that can cause serialization failures and deadlocks
    in the concurrent read pipeline.

    See: https://github.com/airbytehq/airbyte/issues/74883
    """

    NUMERIC_FIELDS = ("clicks", "impressions", "ctr", "position")

    def transform(
        self,
        record: Dict[str, Any],
        config: Optional[Config] = None,
        stream_state: Optional[StreamState] = None,
        stream_slice: Optional[StreamSlice] = None,
    ) -> None:
        for field in self.NUMERIC_FIELDS:
            value = record.get(field)
            if value is not None and isinstance(value, complex):
                record[field] = value.real

Then wire it into all search analytics streams in manifest.yaml as a transformation:

- type: CustomTransformation
  class_name: source_declarative_manifest.components.SanitizeNumericFields

Why both fixes?

  • The CDK fix (this PR) prevents the deadlock for all connectors — good as defense-in-depth
  • The connector fix preserves correct numeric values (value.real returns the actual float) instead of silently writing "(0+0j)" strings to the destination
  • The connector already has a precedent for this pattern — search_analytics_by_query casts query to string for the same reason (manifest.yaml lines 717-728)

@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Great suggestion — you're right that the default=str fallback preserves the sync but degrades data quality (e.g. "(0+0j)" instead of 0.0). A connector-level SanitizeNumericFields transformation using value.real would preserve correct numeric values.

I looked at the connector and can confirm:

  • components.py already imports RecordTransformation and has the pattern ready to use
  • manifest.yaml lines 717-728 show the existing precedent (casting query to string for the same serialization reason)
  • The ctr and position fields in the schema (lines 163-165) are typed as number with multipleOf: 1e-25, so string coercion would violate the schema contract

Want me to create a companion Draft PR in airbytehq/airbyte for the connector-level fix? I can add the SanitizeNumericFields transformation and wire it into the search analytics streams in the manifest.

@pnilan
Copy link
Copy Markdown
Contributor

Updated suggestion: SanitizeNumericFields with warning logging

Building on my previous comment — the connector-level transformation should also log a warning when a complex value is encountered, so the data issue is visible rather than silently discarded:

@dataclass
class SanitizeNumericFields(RecordTransformation):
    """
    Ensures numeric metric fields are JSON-serializable by guarding against non-standard
    Python numeric types (e.g., complex) that can cause serialization failures and deadlocks
    in the concurrent read pipeline.

    See: https://github.com/airbytehq/airbyte/issues/74883
    """

    NUMERIC_FIELDS = ("clicks", "impressions", "ctr", "position")

    def transform(
        self,
        record: Dict[str, Any],
        config: Optional[Config] = None,
        stream_state: Optional[StreamState] = None,
        stream_slice: Optional[StreamSlice] = None,
    ) -> None:
        for field in self.NUMERIC_FIELDS:
            value = record.get(field)
            if value is not None and isinstance(value, complex):
                logger.warning(
                    f"Complex value encountered for field '{field}': {value}. "
                    f"Using real component only."
                )
                record[field] = value.real

Why this is preferable to the default=str approach in this PR:

  • Preserves numeric types: .real returns a float (e.g., complex(0.0423, 0).real0.0423), so downstream numeric processing in the destination continues to work. default=str produces "(0.0423+0j)" — a string that breaks any numeric column.
  • Visible: The warning makes it clear when this edge case fires, rather than silently coercing data.
  • Scoped: Only affects the fields that could plausibly have this issue, rather than broadly stringifying any non-serializable type across all connectors.

@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Thanks for the refined suggestion with the warning logging — that's a good addition. I'll create a companion Draft PR in airbytehq/airbyte now with the SanitizeNumericFields transformation. Will post the link here once it's up.

@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Created the companion connector-level Draft PR per Patrick Nilan (@pnilan)'s suggestion: airbytehq/airbyte#75426

This adds a SanitizeNumericFields transformation that extracts .real from complex values to preserve correct numeric data, wired into all 13 search analytics streams.

…r review

Fail fast with a clear error instead of silently corrupting data
by converting non-serializable types to strings.

Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
@devin-ai-integration devin-ai-integration bot changed the title fix: handle non-JSON-serializable types in serialization fallback (AI-Triage PR) fix: fail fast on non-JSON-serializable types in serialization fallback (AI-Triage PR) Mar 26, 2026
Co-Authored-By: suisui.xia <suisui.xia@airbyte.io>
Copy link
Copy Markdown
Contributor

@pnilan Patrick Nilan (pnilan) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approvingh

@github-actions
Copy link
Copy Markdown

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1773523110-fix-serialization-fallback-complex-types#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1773523110-fix-serialization-fallback-complex-types

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants