diff --git a/CHANGELOG.md b/CHANGELOG.md index 31e73e411..aeba691cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ Newer updates can be found here: [GitHub Release Notes](https://github.com/airby # Changelog +## 6.5.3 + +Bugfix: File transfer syncs now fail early if the shared staging directory is unavailable. + ## 6.5.2 bugfix: Ensure that streams with partition router are not executed concurrently diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py index 1312ab34d..24439e295 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py @@ -70,7 +70,7 @@ def upload(self, record: Record) -> None: ), ) - files_directory = Path(get_files_directory()) + files_directory = Path(get_files_directory(is_file_transfer=True)) file_name = ( self.filename_extractor.eval(self.config, record=record) diff --git a/airbyte_cdk/sources/file_based/file_types/file_transfer.py b/airbyte_cdk/sources/file_based/file_types/file_transfer.py index d260a092d..226851c88 100644 --- a/airbyte_cdk/sources/file_based/file_types/file_transfer.py +++ b/airbyte_cdk/sources/file_based/file_types/file_transfer.py @@ -13,7 +13,7 @@ class FileTransfer: def __init__(self) -> None: - self._local_directory = get_files_directory() + self._local_directory = get_files_directory(is_file_transfer=True) def upload( self, diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index 588c4a18e..93a0a2f29 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -69,7 +69,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): airbyte_columns = [ab_last_mod_col, ab_file_name_col] use_file_transfer = False preserve_directory_structure = True - _file_transfer = FileTransfer() + _file_transfer: Optional[FileTransfer] = None def __init__(self, **kwargs: Any): if self.FILE_TRANSFER_KW in kwargs: @@ -164,7 +164,10 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte try: if self.use_file_transfer: - for file_record_data, file_reference in self._file_transfer.upload( + if self._file_transfer is None: + self._file_transfer = FileTransfer() + file_transfer = self._file_transfer + for file_record_data, file_reference in file_transfer.upload( file=file, stream_reader=self.stream_reader, logger=self.logger ): yield stream_data_to_airbyte_message( diff --git a/airbyte_cdk/sources/utils/files_directory.py b/airbyte_cdk/sources/utils/files_directory.py index 6b8dd6b79..7cb939a9b 100644 --- a/airbyte_cdk/sources/utils/files_directory.py +++ b/airbyte_cdk/sources/utils/files_directory.py @@ -1,15 +1,49 @@ # # Copyright (c) 2025 Airbyte, Inc., all rights reserved. # -import os +from os import getenv +from pathlib import Path -AIRBYTE_STAGING_DIRECTORY = os.getenv("AIRBYTE_STAGING_DIRECTORY", "/staging/files") +from airbyte_cdk.models import FailureType +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + +AIRBYTE_STAGING_DIRECTORY_ENV_VAR = "AIRBYTE_STAGING_DIRECTORY" +DEFAULT_AIRBYTE_STAGING_DIRECTORY = "/staging/files" DEFAULT_LOCAL_DIRECTORY = "/tmp/airbyte-file-transfer" +UNAVAILABLE_STAGING_DIRECTORY_MESSAGE = "File transfer staging directory is unavailable." + + +def _validate_staging_directory(staging_directory: str) -> None: + staging_directory_path = Path(staging_directory) + if not staging_directory_path.exists(): + raise AirbyteTracedException( + message=UNAVAILABLE_STAGING_DIRECTORY_MESSAGE, + internal_message=( + f"Configured {AIRBYTE_STAGING_DIRECTORY_ENV_VAR} does not exist: " + f"{staging_directory}" + ), + failure_type=FailureType.system_error, + ) + + if not staging_directory_path.is_dir(): + raise AirbyteTracedException( + message=UNAVAILABLE_STAGING_DIRECTORY_MESSAGE, + internal_message=( + f"Configured {AIRBYTE_STAGING_DIRECTORY_ENV_VAR} is not a directory: " + f"{staging_directory}" + ), + failure_type=FailureType.system_error, + ) + + +def get_files_directory(is_file_transfer: bool = False) -> str: + configured_staging_directory = getenv(AIRBYTE_STAGING_DIRECTORY_ENV_VAR) + if configured_staging_directory: + _validate_staging_directory(configured_staging_directory) + return configured_staging_directory + if not is_file_transfer: + return DEFAULT_LOCAL_DIRECTORY -def get_files_directory() -> str: - return ( - AIRBYTE_STAGING_DIRECTORY - if os.path.exists(AIRBYTE_STAGING_DIRECTORY) - else DEFAULT_LOCAL_DIRECTORY - ) + _validate_staging_directory(DEFAULT_AIRBYTE_STAGING_DIRECTORY) + return DEFAULT_AIRBYTE_STAGING_DIRECTORY diff --git a/airbyte_cdk/test/standard_tests/connector_base.py b/airbyte_cdk/test/standard_tests/connector_base.py index dac656cf4..6a65eb89d 100644 --- a/airbyte_cdk/test/standard_tests/connector_base.py +++ b/airbyte_cdk/test/standard_tests/connector_base.py @@ -5,11 +5,14 @@ import importlib import os +from collections.abc import Callable +from inspect import Parameter, signature from pathlib import Path -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Any, cast from boltons.typeutils import classproperty +from airbyte_cdk.models import ConfiguredAirbyteCatalog from airbyte_cdk.test import entrypoint_wrapper from airbyte_cdk.test.models import ( ConnectorTestScenario, @@ -18,8 +21,6 @@ from airbyte_cdk.test.standard_tests.docker_base import DockerConnectorTestSuite if TYPE_CHECKING: - from collections.abc import Callable - from airbyte_cdk.test import entrypoint_wrapper @@ -85,13 +86,25 @@ def connector(cls) -> type[IConnector] | Callable[[], IConnector] | None: def create_connector( cls, scenario: ConnectorTestScenario | None, + catalog: ConfiguredAirbyteCatalog | None = None, ) -> IConnector: """Instantiate the connector class.""" connector = cls.connector # type: ignore if connector: - if callable(connector) or isinstance(connector, type): - # If the connector is a class or factory function, instantiate it: - return cast(IConnector, connector()) # type: ignore [redundant-cast] + if isinstance(connector, type): + config = None + if scenario and _requires_legacy_file_based_constructor_args(connector): + config = ( + scenario.config_dict + if scenario.config_dict is not None + else scenario.get_config_dict( + empty_if_missing=True, + connector_root=cls.get_connector_root_dir(), + ) + ) + return cls._instantiate_connector_class(connector, catalog, config) + if callable(connector): + return connector() # Otherwise, we can't instantiate the connector. Fail with a clear error message. raise NotImplementedError( @@ -100,6 +113,24 @@ def create_connector( "override `cls.create_connector()` to define a custom initialization process." ) + @staticmethod + def _instantiate_connector_class( + connector_class: type[IConnector], + catalog: ConfiguredAirbyteCatalog | None = None, + config: dict[str, Any] | None = None, + ) -> IConnector: + """Instantiate connector classes supported by standard tests.""" + if _requires_legacy_file_based_constructor_args(connector_class): + legacy_file_based_connector = cast( + Callable[ + [ConfiguredAirbyteCatalog | None, dict[str, Any] | None, None], IConnector + ], + connector_class, + ) + return legacy_file_based_connector(catalog, config, None) + + return connector_class() + # Test Definitions def test_check( @@ -117,3 +148,13 @@ def test_check( f"Expected exactly one CONNECTION_STATUS message. " "Got: {result.connection_status_messages!s}" ) + + +def _requires_legacy_file_based_constructor_args(connector_class: type[IConnector]) -> bool: + required_positional_parameter_names = [ + parameter.name + for parameter in signature(connector_class).parameters.values() + if parameter.default is Parameter.empty + and parameter.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD) + ] + return required_positional_parameter_names == ["catalog", "config", "state"] diff --git a/airbyte_cdk/test/standard_tests/declarative_sources.py b/airbyte_cdk/test/standard_tests/declarative_sources.py index 18a2a5910..73e9eab80 100644 --- a/airbyte_cdk/test/standard_tests/declarative_sources.py +++ b/airbyte_cdk/test/standard_tests/declarative_sources.py @@ -6,6 +6,7 @@ import yaml from boltons.typeutils import classproperty +from airbyte_cdk.models import ConfiguredAirbyteCatalog from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, ) @@ -65,6 +66,7 @@ def components_py_path(cls) -> Path | None: def create_connector( cls, scenario: ConnectorTestScenario | None, + catalog: ConfiguredAirbyteCatalog | None = None, ) -> IConnector: """Create a connector scenario for the test suite. diff --git a/airbyte_cdk/test/standard_tests/source_base.py b/airbyte_cdk/test/standard_tests/source_base.py index faecb03c7..3c0f905c3 100644 --- a/airbyte_cdk/test/standard_tests/source_base.py +++ b/airbyte_cdk/test/standard_tests/source_base.py @@ -138,7 +138,7 @@ def test_basic_read( ] ) result = run_test_job( - self.create_connector(scenario), + self.create_connector(scenario, configured_catalog), "read", test_scenario=scenario, connector_root=self.get_connector_root_dir(), @@ -173,7 +173,7 @@ def test_fail_read_with_bad_catalog( ], ) result: entrypoint_wrapper.EntrypointOutput = run_test_job( - self.create_connector(scenario), + self.create_connector(scenario, invalid_configured_catalog), "read", connector_root=self.get_connector_root_dir(), test_scenario=scenario.with_expecting_failure(), # Expect failure due to bad catalog diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index 7dbd478f3..447d5df69 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -128,6 +128,8 @@ def test_get_articles(self) -> None: def test_get_article_attachments(self) -> None: with HttpMocker() as http_mocker: + files_directory = Path(__file__).parent / "staging" + files_directory.mkdir(exist_ok=True) http_mocker.get( HttpRequest(url=STREAM_URL), HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200), @@ -145,12 +147,13 @@ def test_get_article_attachments(self) -> None: ), ) - output = read( - self._config(), - CatalogBuilder() - .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) - .build(), - ) + with patch.dict("os.environ", {"AIRBYTE_STAGING_DIRECTORY": str(files_directory)}): + output = read( + self._config(), + CatalogBuilder() + .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) + .build(), + ) assert output.records file_reference = output.records[0].record.file_reference @@ -167,6 +170,8 @@ def test_get_article_attachments(self) -> None: def test_get_article_attachments_with_filename_extractor(self) -> None: with HttpMocker() as http_mocker: + files_directory = Path(__file__).parent / "staging" + files_directory.mkdir(exist_ok=True) http_mocker.get( HttpRequest(url=STREAM_URL), HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200), @@ -184,20 +189,21 @@ def test_get_article_attachments_with_filename_extractor(self) -> None: ), ) - output = read( - self._config(), - CatalogBuilder() - .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) - .build(), - yaml_file="test_file_stream_with_filename_extractor.yaml", - ) + with patch.dict("os.environ", {"AIRBYTE_STAGING_DIRECTORY": str(files_directory)}): + output = read( + self._config(), + CatalogBuilder() + .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) + .build(), + yaml_file="test_file_stream_with_filename_extractor.yaml", + ) assert len(output.records) == 1 file_reference = output.records[0].record.file_reference assert file_reference assert ( file_reference.staging_file_url - == "/tmp/airbyte-file-transfer/article_attachments/12138758717583/some_image_name.png" + == f"{files_directory}/article_attachments/12138758717583/some_image_name.png" ) assert file_reference.source_file_relative_path assert not re.match( @@ -207,6 +213,8 @@ def test_get_article_attachments_with_filename_extractor(self) -> None: def test_get_article_attachments_messages_for_connector_builder(self) -> None: with HttpMocker() as http_mocker: + files_directory = Path(__file__).parent / "staging" + files_directory.mkdir(exist_ok=True) http_mocker.get( HttpRequest(url=STREAM_URL), HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200), @@ -231,9 +239,12 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Patch the factory class where ConcurrentDeclarativeSource (parent of YamlDeclarativeSource) imports it - with patch( - "airbyte_cdk.sources.declarative.concurrent_declarative_source.ModelToComponentFactory", - new=MockModelToComponentFactory, + with ( + patch( + "airbyte_cdk.sources.declarative.concurrent_declarative_source.ModelToComponentFactory", + new=MockModelToComponentFactory, + ), + patch.dict("os.environ", {"AIRBYTE_STAGING_DIRECTORY": str(files_directory)}), ): output = read( self._config(), diff --git a/unit_tests/sources/file_based/file_types/test_file_transfer.py b/unit_tests/sources/file_based/file_types/test_file_transfer.py new file mode 100644 index 000000000..e54b30e81 --- /dev/null +++ b/unit_tests/sources/file_based/file_types/test_file_transfer.py @@ -0,0 +1,22 @@ +# +# Copyright (c) 2026 Airbyte, Inc., all rights reserved. +# + +import pytest + +from airbyte_cdk.models import FailureType +from airbyte_cdk.sources.file_based.file_types.file_transfer import FileTransfer +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + + +def test_file_transfer_raises_when_shared_staging_directory_is_missing(monkeypatch) -> None: + monkeypatch.delenv("AIRBYTE_STAGING_DIRECTORY", raising=False) + + with pytest.raises(AirbyteTracedException) as exc_info: + FileTransfer() + + assert exc_info.value.failure_type == FailureType.system_error + assert exc_info.value.message == "File transfer staging directory is unavailable." + assert exc_info.value.internal_message == ( + "Configured AIRBYTE_STAGING_DIRECTORY does not exist: /staging/files" + ) diff --git a/unit_tests/sources/file_based/stream/test_default_file_based_stream.py b/unit_tests/sources/file_based/stream/test_default_file_based_stream.py index 54394a36d..6773551b9 100644 --- a/unit_tests/sources/file_based/stream/test_default_file_based_stream.py +++ b/unit_tests/sources/file_based/stream/test_default_file_based_stream.py @@ -348,10 +348,17 @@ def setUp(self) -> None: def test_when_read_records_from_slice_then_return_records(self) -> None: """Verify that we have the new file method and data is empty""" - with mock.patch.object( - FileTransfer, - "upload", - return_value=[(self._A_FILE_RECORD_DATA, self._A_FILE_REFERENCE_MESSAGE)], + with ( + mock.patch.object( + FileTransfer, + "__init__", + return_value=None, + ), + mock.patch.object( + FileTransfer, + "upload", + return_value=[(self._A_FILE_RECORD_DATA, self._A_FILE_REFERENCE_MESSAGE)], + ), ): remote_file = RemoteFile(uri="uri", last_modified=self._NOW) messages = list(self._stream.read_records_from_slice({"files": [remote_file]})) diff --git a/unit_tests/sources/utils/test_files_directory.py b/unit_tests/sources/utils/test_files_directory.py new file mode 100644 index 000000000..c11c0794c --- /dev/null +++ b/unit_tests/sources/utils/test_files_directory.py @@ -0,0 +1,94 @@ +# +# Copyright (c) 2026 Airbyte, Inc., all rights reserved. +# + +import pytest + +from airbyte_cdk.models import FailureType +from airbyte_cdk.sources.utils.files_directory import get_files_directory +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + + +@pytest.mark.parametrize( + "is_file_transfer", + [ + pytest.param(False, id="local_usage"), + pytest.param(True, id="file_transfer"), + ], +) +def test_get_files_directory_uses_configured_staging_directory( + monkeypatch, tmp_path, is_file_transfer +) -> None: + monkeypatch.setenv("AIRBYTE_STAGING_DIRECTORY", str(tmp_path)) + + assert get_files_directory(is_file_transfer=is_file_transfer) == str(tmp_path) + + +@pytest.mark.parametrize( + "is_file_transfer", + [ + pytest.param(False, id="local_usage"), + pytest.param(True, id="file_transfer"), + ], +) +def test_get_files_directory_raises_when_explicit_staging_directory_is_missing( + monkeypatch, tmp_path, is_file_transfer +) -> None: + missing_staging_directory = tmp_path / "missing" + monkeypatch.setenv("AIRBYTE_STAGING_DIRECTORY", str(missing_staging_directory)) + + with pytest.raises(AirbyteTracedException) as exc_info: + get_files_directory(is_file_transfer=is_file_transfer) + + assert exc_info.value.failure_type == FailureType.system_error + assert exc_info.value.message == "File transfer staging directory is unavailable." + assert exc_info.value.internal_message == ( + f"Configured AIRBYTE_STAGING_DIRECTORY does not exist: {missing_staging_directory}" + ) + + +def test_get_files_directory_falls_back_for_local_usage_without_configured_staging_directory( + monkeypatch, +) -> None: + monkeypatch.delenv("AIRBYTE_STAGING_DIRECTORY", raising=False) + + assert get_files_directory() == "/tmp/airbyte-file-transfer" + + +def test_get_files_directory_raises_for_file_transfer_without_shared_staging_directory( + monkeypatch, +) -> None: + monkeypatch.delenv("AIRBYTE_STAGING_DIRECTORY", raising=False) + + with pytest.raises(AirbyteTracedException) as exc_info: + get_files_directory(is_file_transfer=True) + + assert exc_info.value.failure_type == FailureType.system_error + assert exc_info.value.message == "File transfer staging directory is unavailable." + assert exc_info.value.internal_message == ( + "Configured AIRBYTE_STAGING_DIRECTORY does not exist: /staging/files" + ) + + +@pytest.mark.parametrize( + "is_file_transfer", + [ + pytest.param(False, id="local_usage"), + pytest.param(True, id="file_transfer"), + ], +) +def test_get_files_directory_raises_when_explicit_staging_directory_is_a_file( + monkeypatch, tmp_path, is_file_transfer +) -> None: + staging_directory = tmp_path / "stage" + staging_directory.write_text("not a directory") + monkeypatch.setenv("AIRBYTE_STAGING_DIRECTORY", str(staging_directory)) + + with pytest.raises(AirbyteTracedException) as exc_info: + get_files_directory(is_file_transfer=is_file_transfer) + + assert exc_info.value.failure_type == FailureType.system_error + assert exc_info.value.message == "File transfer staging directory is unavailable." + assert exc_info.value.internal_message == ( + f"Configured AIRBYTE_STAGING_DIRECTORY is not a directory: {staging_directory}" + ) diff --git a/unit_tests/test/test_standard_tests.py b/unit_tests/test/test_standard_tests.py index d5dd28277..30ce869a6 100644 --- a/unit_tests/test/test_standard_tests.py +++ b/unit_tests/test/test_standard_tests.py @@ -1,15 +1,52 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. """Unit tests for FAST Airbyte Standard Tests.""" +import logging +from collections.abc import Iterable, Mapping from typing import Any import pytest +from airbyte_cdk.models import ( + AirbyteCatalog, + AirbyteMessage, + AirbyteStateMessage, + ConfiguredAirbyteCatalog, +) from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, ) from airbyte_cdk.sources.source import Source +from airbyte_cdk.test.models import ConnectorTestScenario from airbyte_cdk.test.standard_tests._job_runner import IConnector +from airbyte_cdk.test.standard_tests.connector_base import ConnectorTestSuiteBase + + +class LegacyFileBasedConnector(Source): + def __init__( + self, + catalog: ConfiguredAirbyteCatalog | None, + config: dict[str, Any] | None, + state: list[AirbyteStateMessage] | None, + ) -> None: + self.catalog = catalog + self.config = config + self.state = state + + def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> None: + pass + + def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: + return AirbyteCatalog(streams=[]) + + def read( + self, + logger: logging.Logger, + config: Mapping[str, Any], + catalog: ConfiguredAirbyteCatalog, + state: list[AirbyteStateMessage] | None = None, + ) -> Iterable[AirbyteMessage]: + return [] @pytest.mark.parametrize( @@ -31,3 +68,19 @@ def test_is_iconnector_check(input: Any, expected: bool) -> None: return assert isinstance(input, IConnector) == expected + + +def test_create_connector_instantiates_legacy_file_based_sources_with_runtime_args() -> None: + class TestSuite(ConnectorTestSuiteBase): + connector = LegacyFileBasedConnector + + catalog = ConfiguredAirbyteCatalog(streams=[]) + connector = TestSuite.create_connector( + ConnectorTestScenario(config_dict={"folder_url": "https://example.com"}), + catalog, + ) + + assert isinstance(connector, LegacyFileBasedConnector) + assert connector.catalog == catalog + assert connector.config == {"folder_url": "https://example.com"} + assert connector.state is None