diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 7acb5c1e2..d3df5dfaf 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -11,6 +11,8 @@ from io import BufferedIOBase, TextIOWrapper from typing import Any, List, Optional +GZIP_MAGIC_BYTES = b"\x1f\x8b" + import orjson import requests @@ -35,15 +37,22 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: """ Decompress gzipped bytes and pass decompressed data to the inner parser. - IMPORTANT: - - If the data is not gzipped, reset the pointer and pass the data to the inner parser as is. - - Note: - - The data is not decoded by default. + Auto-detects gzip content by checking for magic bytes (0x1f 0x8b) at the start. + If the data is not gzip-compressed, it is passed directly to the inner parser. + This handles APIs that return gzip-compressed bodies without Content-Encoding header. """ + header = data.read(2) + if not header: + return - with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: - yield from self.inner_parser.parse(gzipobj) + remaining = data.read() + full_data = io.BytesIO(header + remaining) + + if header == GZIP_MAGIC_BYTES: + with gzip.GzipFile(fileobj=full_data, mode="rb") as gzipobj: + yield from self.inner_parser.parse(gzipobj) + else: + yield from self.inner_parser.parse(full_data) @dataclass diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 560dd4056..b58742f1c 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2640,15 +2640,16 @@ def create_gzip_decoder( gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser if self._emit_connector_builder_messages: - # This is very surprising but if the response is not streamed, - # CompositeRawDecoder calls response.content and the requests library actually uncompress the data as opposed to response.raw, - # which uses urllib3 directly and does not uncompress the data. - return CompositeRawDecoder(gzip_parser.inner_parser, False) + # When not streaming, CompositeRawDecoder uses response.content which the requests + # library auto-decompresses when Content-Encoding is set. However, some APIs return + # gzip data without Content-Encoding headers. Using gzip_parser (which auto-detects + # gzip magic bytes) ensures decompression works in both cases. + return CompositeRawDecoder(gzip_parser, False) return CompositeRawDecoder.by_headers( [({"Content-Encoding", "Content-Type"}, _compressed_response_types, gzip_parser)], stream_response=True, - fallback_parser=gzip_parser.inner_parser, + fallback_parser=gzip_parser, ) @staticmethod diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index d92d6c605..119d1756f 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -362,3 +362,104 @@ def test_given_response_is_not_streamed_when_decode_then_can_be_called_multiple_ content_second_time = list(composite_raw_decoder.decode(response)) assert content == content_second_time + + +class TestGzipParserAutoDetection: + """Tests for GzipParser auto-detection of gzip content via magic bytes.""" + + def test_gzip_csv_without_content_encoding_header(self, requests_mock): + """GzipParser should decompress gzip data even without Content-Encoding header.""" + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=generate_csv(should_compress=True), + # No Content-Encoding header set + ) + response = requests.get("https://airbyte.io/", stream=True) + parser = GzipParser(inner_parser=CsvParser()) + decoder = CompositeRawDecoder(parser=parser) + records = list(decoder.decode(response)) + assert len(records) == 3 + assert records[0] == {"id": "1", "name": "John", "age": "28"} + + def test_gzip_jsonl_without_content_encoding_header(self, requests_mock): + """GzipParser should decompress gzip JSONL data without Content-Encoding header.""" + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=generate_compressed_jsonlines(), + # No Content-Encoding header set + ) + response = requests.get("https://airbyte.io/", stream=True) + parser = GzipParser(inner_parser=JsonLineParser()) + decoder = CompositeRawDecoder(parser=parser) + records = list(decoder.decode(response)) + assert len(records) == 3 + assert records[0] == {"id": 1, "message": "Hello, World!"} + + def test_non_gzip_data_passthrough(self, requests_mock): + """GzipParser should pass non-gzip data through to the inner parser unchanged.""" + plain_csv = generate_csv(should_compress=False) + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=plain_csv, + ) + response = requests.get("https://airbyte.io/", stream=True) + parser = GzipParser(inner_parser=CsvParser()) + decoder = CompositeRawDecoder(parser=parser) + records = list(decoder.decode(response)) + assert len(records) == 3 + assert records[0] == {"id": "1", "name": "John", "age": "28"} + + def test_non_gzip_jsonl_passthrough(self, requests_mock): + """GzipParser should pass plain JSONL data through to the inner parser.""" + plain_jsonl = "".join(generate_jsonlines()).encode("utf-8") + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=plain_jsonl, + ) + response = requests.get("https://airbyte.io/", stream=True) + parser = GzipParser(inner_parser=JsonLineParser()) + decoder = CompositeRawDecoder(parser=parser) + records = list(decoder.decode(response)) + assert len(records) == 3 + assert records[0] == {"id": 1, "message": "Hello, World!"} + + def test_empty_data_returns_no_records(self): + """GzipParser should gracefully handle empty data.""" + parser = GzipParser(inner_parser=CsvParser()) + records = list(parser.parse(BytesIO(b""))) + assert records == [] + + def test_gzip_fallback_in_by_headers_mode(self, requests_mock): + """When used as fallback_parser in by_headers mode, GzipParser should auto-detect gzip.""" + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=generate_compressed_jsonlines(), + headers={"Content-Encoding": "identity"}, # Not gzip, so fallback is used + ) + response = requests.get("https://airbyte.io/", stream=True) + gzip_parser = GzipParser(inner_parser=JsonLineParser()) + decoder = CompositeRawDecoder.by_headers( + [({"Content-Encoding"}, {"gzip"}, gzip_parser)], + stream_response=True, + fallback_parser=gzip_parser, + ) + records = list(decoder.decode(response)) + assert len(records) == 3 + + def test_non_streamed_gzip_without_content_encoding(self, requests_mock): + """GzipParser should handle gzip data in non-streamed mode (response.content).""" + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=generate_compressed_jsonlines(), + ) + response = requests.get("https://airbyte.io/") # Not streamed + parser = GzipParser(inner_parser=JsonLineParser()) + decoder = CompositeRawDecoder(parser=parser, stream_response=False) + records = list(decoder.decode(response)) + assert len(records) == 3