Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from io import BufferedIOBase, TextIOWrapper
from typing import Any, List, Optional

GZIP_MAGIC_BYTES = b"\x1f\x8b"

import orjson
import requests

Expand All @@ -35,15 +37,22 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
"""
Decompress gzipped bytes and pass decompressed data to the inner parser.

IMPORTANT:
- If the data is not gzipped, reset the pointer and pass the data to the inner parser as is.

Note:
- The data is not decoded by default.
Auto-detects gzip content by checking for magic bytes (0x1f 0x8b) at the start.
If the data is not gzip-compressed, it is passed directly to the inner parser.
This handles APIs that return gzip-compressed bodies without Content-Encoding header.
"""
header = data.read(2)
if not header:
return

with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
yield from self.inner_parser.parse(gzipobj)
remaining = data.read()
full_data = io.BytesIO(header + remaining)

if header == GZIP_MAGIC_BYTES:
with gzip.GzipFile(fileobj=full_data, mode="rb") as gzipobj:
yield from self.inner_parser.parse(gzipobj)
else:
yield from self.inner_parser.parse(full_data)


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2640,15 +2640,16 @@ def create_gzip_decoder(
gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser

if self._emit_connector_builder_messages:
# This is very surprising but if the response is not streamed,
# CompositeRawDecoder calls response.content and the requests library actually uncompress the data as opposed to response.raw,
# which uses urllib3 directly and does not uncompress the data.
return CompositeRawDecoder(gzip_parser.inner_parser, False)
# When not streaming, CompositeRawDecoder uses response.content which the requests
# library auto-decompresses when Content-Encoding is set. However, some APIs return
# gzip data without Content-Encoding headers. Using gzip_parser (which auto-detects
# gzip magic bytes) ensures decompression works in both cases.
return CompositeRawDecoder(gzip_parser, False)

return CompositeRawDecoder.by_headers(
[({"Content-Encoding", "Content-Type"}, _compressed_response_types, gzip_parser)],
stream_response=True,
fallback_parser=gzip_parser.inner_parser,
fallback_parser=gzip_parser,
)

@staticmethod
Expand Down
101 changes: 101 additions & 0 deletions unit_tests/sources/declarative/decoders/test_composite_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,104 @@ def test_given_response_is_not_streamed_when_decode_then_can_be_called_multiple_
content_second_time = list(composite_raw_decoder.decode(response))

assert content == content_second_time


class TestGzipParserAutoDetection:
"""Tests for GzipParser auto-detection of gzip content via magic bytes."""

def test_gzip_csv_without_content_encoding_header(self, requests_mock):
"""GzipParser should decompress gzip data even without Content-Encoding header."""
requests_mock.register_uri(
"GET",
"https://airbyte.io/",
content=generate_csv(should_compress=True),
# No Content-Encoding header set
)
response = requests.get("https://airbyte.io/", stream=True)
parser = GzipParser(inner_parser=CsvParser())
decoder = CompositeRawDecoder(parser=parser)
records = list(decoder.decode(response))
assert len(records) == 3
assert records[0] == {"id": "1", "name": "John", "age": "28"}

def test_gzip_jsonl_without_content_encoding_header(self, requests_mock):
"""GzipParser should decompress gzip JSONL data without Content-Encoding header."""
requests_mock.register_uri(
"GET",
"https://airbyte.io/",
content=generate_compressed_jsonlines(),
# No Content-Encoding header set
)
response = requests.get("https://airbyte.io/", stream=True)
parser = GzipParser(inner_parser=JsonLineParser())
decoder = CompositeRawDecoder(parser=parser)
records = list(decoder.decode(response))
assert len(records) == 3
assert records[0] == {"id": 1, "message": "Hello, World!"}

def test_non_gzip_data_passthrough(self, requests_mock):
"""GzipParser should pass non-gzip data through to the inner parser unchanged."""
plain_csv = generate_csv(should_compress=False)
requests_mock.register_uri(
"GET",
"https://airbyte.io/",
content=plain_csv,
)
response = requests.get("https://airbyte.io/", stream=True)
parser = GzipParser(inner_parser=CsvParser())
decoder = CompositeRawDecoder(parser=parser)
records = list(decoder.decode(response))
assert len(records) == 3
assert records[0] == {"id": "1", "name": "John", "age": "28"}

def test_non_gzip_jsonl_passthrough(self, requests_mock):
"""GzipParser should pass plain JSONL data through to the inner parser."""
plain_jsonl = "".join(generate_jsonlines()).encode("utf-8")
requests_mock.register_uri(
"GET",
"https://airbyte.io/",
content=plain_jsonl,
)
response = requests.get("https://airbyte.io/", stream=True)
parser = GzipParser(inner_parser=JsonLineParser())
decoder = CompositeRawDecoder(parser=parser)
records = list(decoder.decode(response))
assert len(records) == 3
assert records[0] == {"id": 1, "message": "Hello, World!"}

def test_empty_data_returns_no_records(self):
"""GzipParser should gracefully handle empty data."""
parser = GzipParser(inner_parser=CsvParser())
records = list(parser.parse(BytesIO(b"")))
assert records == []

def test_gzip_fallback_in_by_headers_mode(self, requests_mock):
"""When used as fallback_parser in by_headers mode, GzipParser should auto-detect gzip."""
requests_mock.register_uri(
"GET",
"https://airbyte.io/",
content=generate_compressed_jsonlines(),
headers={"Content-Encoding": "identity"}, # Not gzip, so fallback is used
)
response = requests.get("https://airbyte.io/", stream=True)
gzip_parser = GzipParser(inner_parser=JsonLineParser())
decoder = CompositeRawDecoder.by_headers(
[({"Content-Encoding"}, {"gzip"}, gzip_parser)],
stream_response=True,
fallback_parser=gzip_parser,
)
records = list(decoder.decode(response))
assert len(records) == 3

def test_non_streamed_gzip_without_content_encoding(self, requests_mock):
"""GzipParser should handle gzip data in non-streamed mode (response.content)."""
requests_mock.register_uri(
"GET",
"https://airbyte.io/",
content=generate_compressed_jsonlines(),
)
response = requests.get("https://airbyte.io/") # Not streamed
parser = GzipParser(inner_parser=JsonLineParser())
decoder = CompositeRawDecoder(parser=parser, stream_response=False)
records = list(decoder.decode(response))
assert len(records) == 3
Loading