Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ Newer updates can be found here: [GitHub Release Notes](https://github.com/airby

# Changelog

## 6.5.3

Bugfix: File transfer syncs now fail early if the shared staging directory is unavailable.

## 6.5.2

bugfix: Ensure that streams with partition router are not executed concurrently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def upload(self, record: Record) -> None:
),
)

files_directory = Path(get_files_directory())
files_directory = Path(get_files_directory(is_file_transfer=True))

file_name = (
self.filename_extractor.eval(self.config, record=record)
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/file_based/file_types/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

class FileTransfer:
def __init__(self) -> None:
self._local_directory = get_files_directory()
self._local_directory = get_files_directory(is_file_transfer=True)

def upload(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
airbyte_columns = [ab_last_mod_col, ab_file_name_col]
use_file_transfer = False
preserve_directory_structure = True
_file_transfer = FileTransfer()
_file_transfer: Optional[FileTransfer] = None

def __init__(self, **kwargs: Any):
if self.FILE_TRANSFER_KW in kwargs:
Expand Down Expand Up @@ -164,7 +164,10 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte

try:
if self.use_file_transfer:
for file_record_data, file_reference in self._file_transfer.upload(
if self._file_transfer is None:
self._file_transfer = FileTransfer()
file_transfer = self._file_transfer
for file_record_data, file_reference in file_transfer.upload(
file=file, stream_reader=self.stream_reader, logger=self.logger
):
yield stream_data_to_airbyte_message(
Expand Down
50 changes: 42 additions & 8 deletions airbyte_cdk/sources/utils/files_directory.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,49 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import os
from os import getenv
from pathlib import Path

AIRBYTE_STAGING_DIRECTORY = os.getenv("AIRBYTE_STAGING_DIRECTORY", "/staging/files")
from airbyte_cdk.models import FailureType
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

AIRBYTE_STAGING_DIRECTORY_ENV_VAR = "AIRBYTE_STAGING_DIRECTORY"
DEFAULT_AIRBYTE_STAGING_DIRECTORY = "/staging/files"
DEFAULT_LOCAL_DIRECTORY = "/tmp/airbyte-file-transfer"
UNAVAILABLE_STAGING_DIRECTORY_MESSAGE = "File transfer staging directory is unavailable."


def _validate_staging_directory(staging_directory: str) -> None:
staging_directory_path = Path(staging_directory)
if not staging_directory_path.exists():
raise AirbyteTracedException(
message=UNAVAILABLE_STAGING_DIRECTORY_MESSAGE,
internal_message=(
f"Configured {AIRBYTE_STAGING_DIRECTORY_ENV_VAR} does not exist: "
f"{staging_directory}"
),
failure_type=FailureType.system_error,
)

if not staging_directory_path.is_dir():
raise AirbyteTracedException(
message=UNAVAILABLE_STAGING_DIRECTORY_MESSAGE,
internal_message=(
f"Configured {AIRBYTE_STAGING_DIRECTORY_ENV_VAR} is not a directory: "
f"{staging_directory}"
),
failure_type=FailureType.system_error,
)


def get_files_directory(is_file_transfer: bool = False) -> str:
configured_staging_directory = getenv(AIRBYTE_STAGING_DIRECTORY_ENV_VAR)
if configured_staging_directory:
_validate_staging_directory(configured_staging_directory)
return configured_staging_directory

if not is_file_transfer:
return DEFAULT_LOCAL_DIRECTORY

def get_files_directory() -> str:
return (
AIRBYTE_STAGING_DIRECTORY
if os.path.exists(AIRBYTE_STAGING_DIRECTORY)
else DEFAULT_LOCAL_DIRECTORY
)
_validate_staging_directory(DEFAULT_AIRBYTE_STAGING_DIRECTORY)
return DEFAULT_AIRBYTE_STAGING_DIRECTORY
53 changes: 47 additions & 6 deletions airbyte_cdk/test/standard_tests/connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

import importlib
import os
from collections.abc import Callable
from inspect import Parameter, signature
from pathlib import Path
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING, Any, cast

from boltons.typeutils import classproperty

from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.test import entrypoint_wrapper
from airbyte_cdk.test.models import (
ConnectorTestScenario,
Expand All @@ -18,8 +21,6 @@
from airbyte_cdk.test.standard_tests.docker_base import DockerConnectorTestSuite

if TYPE_CHECKING:
from collections.abc import Callable

from airbyte_cdk.test import entrypoint_wrapper


Expand Down Expand Up @@ -85,13 +86,25 @@ def connector(cls) -> type[IConnector] | Callable[[], IConnector] | None:
def create_connector(
cls,
scenario: ConnectorTestScenario | None,
catalog: ConfiguredAirbyteCatalog | None = None,
) -> IConnector:
"""Instantiate the connector class."""
connector = cls.connector # type: ignore
if connector:
if callable(connector) or isinstance(connector, type):
# If the connector is a class or factory function, instantiate it:
return cast(IConnector, connector()) # type: ignore [redundant-cast]
if isinstance(connector, type):
config = None
if scenario and _requires_legacy_file_based_constructor_args(connector):
config = (
scenario.config_dict
if scenario.config_dict is not None
else scenario.get_config_dict(
empty_if_missing=True,
connector_root=cls.get_connector_root_dir(),
)
)
return cls._instantiate_connector_class(connector, catalog, config)
if callable(connector):
return connector()

# Otherwise, we can't instantiate the connector. Fail with a clear error message.
raise NotImplementedError(
Expand All @@ -100,6 +113,24 @@ def create_connector(
"override `cls.create_connector()` to define a custom initialization process."
)

@staticmethod
def _instantiate_connector_class(
connector_class: type[IConnector],
catalog: ConfiguredAirbyteCatalog | None = None,
config: dict[str, Any] | None = None,
) -> IConnector:
"""Instantiate connector classes supported by standard tests."""
if _requires_legacy_file_based_constructor_args(connector_class):
legacy_file_based_connector = cast(
Callable[
[ConfiguredAirbyteCatalog | None, dict[str, Any] | None, None], IConnector
],
connector_class,
)
return legacy_file_based_connector(catalog, config, None)

return connector_class()

# Test Definitions

def test_check(
Expand All @@ -117,3 +148,13 @@ def test_check(
f"Expected exactly one CONNECTION_STATUS message. "
"Got: {result.connection_status_messages!s}"
)


def _requires_legacy_file_based_constructor_args(connector_class: type[IConnector]) -> bool:
required_positional_parameter_names = [
parameter.name
for parameter in signature(connector_class).parameters.values()
if parameter.default is Parameter.empty
and parameter.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD)
]
return required_positional_parameter_names == ["catalog", "config", "state"]
2 changes: 2 additions & 0 deletions airbyte_cdk/test/standard_tests/declarative_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import yaml
from boltons.typeutils import classproperty

from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
ConcurrentDeclarativeSource,
)
Expand Down Expand Up @@ -65,6 +66,7 @@ def components_py_path(cls) -> Path | None:
def create_connector(
cls,
scenario: ConnectorTestScenario | None,
catalog: ConfiguredAirbyteCatalog | None = None,
) -> IConnector:
"""Create a connector scenario for the test suite.

Expand Down
4 changes: 2 additions & 2 deletions airbyte_cdk/test/standard_tests/source_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def test_basic_read(
]
)
result = run_test_job(
self.create_connector(scenario),
self.create_connector(scenario, configured_catalog),
"read",
test_scenario=scenario,
connector_root=self.get_connector_root_dir(),
Expand Down Expand Up @@ -173,7 +173,7 @@ def test_fail_read_with_bad_catalog(
],
)
result: entrypoint_wrapper.EntrypointOutput = run_test_job(
self.create_connector(scenario),
self.create_connector(scenario, invalid_configured_catalog),
"read",
connector_root=self.get_connector_root_dir(),
test_scenario=scenario.with_expecting_failure(), # Expect failure due to bad catalog
Expand Down
45 changes: 28 additions & 17 deletions unit_tests/sources/declarative/file/test_file_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def test_get_articles(self) -> None:

def test_get_article_attachments(self) -> None:
with HttpMocker() as http_mocker:
files_directory = Path(__file__).parent / "staging"
files_directory.mkdir(exist_ok=True)
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
Expand All @@ -145,12 +147,13 @@ def test_get_article_attachments(self) -> None:
),
)

output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
)
with patch.dict("os.environ", {"AIRBYTE_STAGING_DIRECTORY": str(files_directory)}):
output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
)

assert output.records
file_reference = output.records[0].record.file_reference
Expand All @@ -167,6 +170,8 @@ def test_get_article_attachments(self) -> None:

def test_get_article_attachments_with_filename_extractor(self) -> None:
with HttpMocker() as http_mocker:
files_directory = Path(__file__).parent / "staging"
files_directory.mkdir(exist_ok=True)
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
Expand All @@ -184,20 +189,21 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:
),
)

output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
yaml_file="test_file_stream_with_filename_extractor.yaml",
)
with patch.dict("os.environ", {"AIRBYTE_STAGING_DIRECTORY": str(files_directory)}):
output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
yaml_file="test_file_stream_with_filename_extractor.yaml",
)

assert len(output.records) == 1
file_reference = output.records[0].record.file_reference
assert file_reference
assert (
file_reference.staging_file_url
== "/tmp/airbyte-file-transfer/article_attachments/12138758717583/some_image_name.png"
== f"{files_directory}/article_attachments/12138758717583/some_image_name.png"
)
assert file_reference.source_file_relative_path
assert not re.match(
Expand All @@ -207,6 +213,8 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:

def test_get_article_attachments_messages_for_connector_builder(self) -> None:
with HttpMocker() as http_mocker:
files_directory = Path(__file__).parent / "staging"
files_directory.mkdir(exist_ok=True)
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
Expand All @@ -231,9 +239,12 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

# Patch the factory class where ConcurrentDeclarativeSource (parent of YamlDeclarativeSource) imports it
with patch(
"airbyte_cdk.sources.declarative.concurrent_declarative_source.ModelToComponentFactory",
new=MockModelToComponentFactory,
with (
patch(
"airbyte_cdk.sources.declarative.concurrent_declarative_source.ModelToComponentFactory",
new=MockModelToComponentFactory,
),
patch.dict("os.environ", {"AIRBYTE_STAGING_DIRECTORY": str(files_directory)}),
):
output = read(
self._config(),
Expand Down
22 changes: 22 additions & 0 deletions unit_tests/sources/file_based/file_types/test_file_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
#

import pytest

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.file_based.file_types.file_transfer import FileTransfer
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


def test_file_transfer_raises_when_shared_staging_directory_is_missing(monkeypatch) -> None:
monkeypatch.delenv("AIRBYTE_STAGING_DIRECTORY", raising=False)

with pytest.raises(AirbyteTracedException) as exc_info:
FileTransfer()

assert exc_info.value.failure_type == FailureType.system_error
assert exc_info.value.message == "File transfer staging directory is unavailable."
assert exc_info.value.internal_message == (
"Configured AIRBYTE_STAGING_DIRECTORY does not exist: /staging/files"
)
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,17 @@ def setUp(self) -> None:

def test_when_read_records_from_slice_then_return_records(self) -> None:
"""Verify that we have the new file method and data is empty"""
with mock.patch.object(
FileTransfer,
"upload",
return_value=[(self._A_FILE_RECORD_DATA, self._A_FILE_REFERENCE_MESSAGE)],
with (
mock.patch.object(
FileTransfer,
"__init__",
return_value=None,
),
mock.patch.object(
FileTransfer,
"upload",
return_value=[(self._A_FILE_RECORD_DATA, self._A_FILE_REFERENCE_MESSAGE)],
),
):
remote_file = RemoteFile(uri="uri", last_modified=self._NOW)
messages = list(self._stream.read_records_from_slice({"files": [remote_file]}))
Expand Down
Loading
Loading