Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4475a94
Add block_simultaneous_read to DefaultStream
tolik0 Dec 30, 2025
54631ac
Change `block_simultaneous_read` to string
tolik0 Jan 6, 2026
5bf631e
Fix StreamFacade
tolik0 Jan 6, 2026
0758005
Fix NoneType error when all streams are blocked
tolik0 Jan 9, 2026
06abd41
Fix unit tests
tolik0 Jan 12, 2026
61c1072
Auto-fix lint and format issues
Jan 12, 2026
1c1555b
Add retry deferred streams on stream completion
tolik0 Jan 13, 2026
b3a98f4
Fix unit tests
tolik0 Jan 13, 2026
0377b80
More fixes for unit tests
tolik0 Jan 13, 2026
8acab48
refactor: replace per-stream block_simultaneous_read with top-level s…
devin-ai-integration[bot] Feb 25, 2026
c33a61b
refactor: move stream_name_to_group into ModelToComponentFactory
devin-ai-integration[bot] Feb 25, 2026
8efc56c
refactor: use stream_groups manifest in factory test instead of hardc…
devin-ai-integration[bot] Feb 25, 2026
4bf56ec
fix: only include parent stream in stream_groups to avoid deadlock
devin-ai-integration[bot] Feb 26, 2026
276c007
style: fix ruff format for long line
devin-ai-integration[bot] Feb 26, 2026
59fbd79
refactor: move _build_stream_name_to_group into ModelToComponentFactory
devin-ai-integration[bot] Feb 27, 2026
40d53e7
refactor: resolve stream_groups from actual stream instances instead …
devin-ai-integration[bot] Mar 3, 2026
6726050
Fix stream format in schema
tolik0 Mar 4, 2026
76a29f2
refactor: add get_partition_router() helper to DefaultStream
devin-ai-integration[bot] Mar 4, 2026
b6be8c8
feat: validate no parent-child streams share a group to prevent deadlock
devin-ai-integration[bot] Mar 4, 2026
3f5a354
feat: assert partition generation queue is empty when all streams are…
devin-ai-integration[bot] Mar 4, 2026
4b8b141
refactor: move inline imports to module level in default_stream.py an…
devin-ai-integration[bot] Mar 4, 2026
7306516
fix: unwrap GroupingPartitionRouter in get_partition_router() to dete…
devin-ai-integration[bot] Mar 4, 2026
c4c9270
fix: handle GroupingPartitionRouter at call sites instead of in get_p…
devin-ai-integration[bot] Mar 4, 2026
f1e020b
feat: check active_groups is empty in is_done() safety check
devin-ai-integration[bot] Mar 4, 2026
e027b73
test: add missing unit tests for GroupingPartitionRouter, active_grou…
devin-ai-integration[bot] Mar 4, 2026
6f52073
fix: make deadlock validation check all ancestors, not just direct pa…
devin-ai-integration[bot] Mar 4, 2026
e7acac8
style: alphabetize StreamGroup and BlockSimultaneousSyncsAction in sc…
devin-ai-integration[bot] Mar 5, 2026
b56b376
style: move BlockSimultaneousSyncsAction next to StreamGroup for easi…
devin-ai-integration[bot] Mar 5, 2026
055655a
fix: update test regex to match actual error message after rebase
devin-ai-integration[bot] Mar 19, 2026
22ef57b
fix: correct ruff formatting and update active_groups test regex to m…
devin-ai-integration[bot] Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 218 additions & 14 deletions airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Large diffs are not rendered by default.

71 changes: 70 additions & 1 deletion airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,19 @@
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
ModelToComponentFactory,
)
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
GroupingPartitionRouter,
)
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
SubstreamPartitionRouter,
)
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.spec.spec import Spec
from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition
from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
from airbyte_cdk.sources.utils.slice_logger import (
AlwaysLogSliceLogger,
Expand Down Expand Up @@ -405,6 +412,8 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
if api_budget_model:
self._constructor.set_api_budget(api_budget_model, self._config)

prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs))

source_streams = [
self._constructor.create_component(
(
Expand All @@ -416,10 +425,70 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
self._config,
emit_connector_builder_messages=self._emit_connector_builder_messages,
)
for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
for stream_config in prepared_configs
]

self._apply_stream_groups(source_streams)

return source_streams

def _apply_stream_groups(self, streams: List[AbstractStream]) -> None:
"""Set block_simultaneous_read on streams based on the manifest's stream_groups config.

Iterates over the resolved manifest's stream_groups and matches group membership
against actual created stream instances by name. Validates that no stream shares a
group with any of its parent streams, which would cause a deadlock.
"""
stream_groups = self._source_config.get("stream_groups", {})
if not stream_groups:
return

# Build stream_name -> group_name mapping from the resolved manifest
stream_name_to_group: Dict[str, str] = {}
for group_name, group_config in stream_groups.items():
for stream_ref in group_config.get("streams", []):
if isinstance(stream_ref, dict):
stream_name = stream_ref.get("name", "")
if stream_name:
stream_name_to_group[stream_name] = group_name

# Validate no stream shares a group with any of its ancestor streams
stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams}

def _collect_all_ancestor_names(stream_name: str) -> Set[str]:
"""Recursively collect all ancestor stream names."""
ancestors: Set[str] = set()
inst = stream_name_to_instance.get(stream_name)
if not isinstance(inst, DefaultStream):
return ancestors
router = inst.get_partition_router()
if isinstance(router, GroupingPartitionRouter):
router = router.underlying_partition_router
if not isinstance(router, SubstreamPartitionRouter):
return ancestors
for parent_config in router.parent_stream_configs:
parent_name = parent_config.stream.name
ancestors.add(parent_name)
ancestors.update(_collect_all_ancestor_names(parent_name))
return ancestors

for stream in streams:
if not isinstance(stream, DefaultStream) or stream.name not in stream_name_to_group:
continue
group_name = stream_name_to_group[stream.name]
for ancestor_name in _collect_all_ancestor_names(stream.name):
if stream_name_to_group.get(ancestor_name) == group_name:
raise ValueError(
f"Stream '{stream.name}' and its parent stream '{ancestor_name}' "
f"are both in group '{group_name}'. "
f"A child stream must not share a group with its parent to avoid deadlock."
)

# Apply group to matching stream instances
for stream in streams:
if isinstance(stream, DefaultStream) and stream.name in stream_name_to_group:
stream.block_simultaneous_read = stream_name_to_group[stream.name]

@staticmethod
def _initialize_cache_for_parent_streams(
stream_configs: List[Dict[str, Any]],
Expand Down
46 changes: 46 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ properties:
"$ref": "#/definitions/ConcurrencyLevel"
api_budget:
"$ref": "#/definitions/HTTPAPIBudget"
stream_groups:
title: Stream Groups
description: >
Groups of streams that share a common resource and should not be read simultaneously.
Each group defines a set of stream references and an action that controls how concurrent
reads are managed. Only applies to ConcurrentDeclarativeSource.
type: object
additionalProperties:
"$ref": "#/definitions/StreamGroup"
max_concurrent_async_job_count:
title: Maximum Concurrent Asynchronous Jobs
description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.
Expand Down Expand Up @@ -4191,6 +4200,43 @@ definitions:
- "$ref": "#/definitions/ConfigRemoveFields"
- "$ref": "#/definitions/CustomConfigTransformation"
default: []
StreamGroup:
title: Stream Group
description: >
A group of streams that share a common resource and should not be read simultaneously.
Streams in the same group will be blocked from concurrent reads based on the specified action.
type: object
required:
- streams
- action
properties:
streams:
title: Streams
description: >
List of references to streams that belong to this group.
type: array
items:
anyOf:
- "$ref": "#/definitions/DeclarativeStream"
action:
title: Action
description: The action to apply to streams in this group.
"$ref": "#/definitions/BlockSimultaneousSyncsAction"
BlockSimultaneousSyncsAction:
title: Block Simultaneous Syncs Action
description: >
Action that prevents streams in the same group from being read concurrently.
When applied to a stream group, streams with this action will be deferred if
another stream in the same group is currently active.
This is useful for APIs that don't allow concurrent access to the same
endpoint or session. Only applies to ConcurrentDeclarativeSource.
type: object
required:
- type
properties:
type:
type: string
enum: [BlockSimultaneousSyncsAction]
SubstreamPartitionRouter:
title: Substream Partition Router
description: Partition router that is used to retrieve records that have been partitioned according to records from the specified parent streams. An example of a parent stream is automobile brands and the substream would be the various car models associated with each branch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2394,6 +2394,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
stream_groups: Optional[Dict[str, StreamGroup]] = Field(
None,
description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.",
title="Stream Groups",
)
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
None,
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
Expand Down Expand Up @@ -2429,6 +2434,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
stream_groups: Optional[Dict[str, StreamGroup]] = Field(
None,
description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.",
title="Stream Groups",
)
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
None,
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
Expand Down Expand Up @@ -3096,6 +3106,23 @@ class AsyncRetriever(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class BlockSimultaneousSyncsAction(BaseModel):
type: Literal["BlockSimultaneousSyncsAction"]


class StreamGroup(BaseModel):
streams: List[str] = Field(
...,
description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").',
title="Streams",
)
action: BlockSimultaneousSyncsAction = Field(
...,
description="The action to apply to streams in this group.",
title="Action",
)


class SubstreamPartitionRouter(BaseModel):
type: Literal["SubstreamPartitionRouter"]
parent_stream_configs: List[ParentStreamConfig] = Field(
Expand Down
15 changes: 15 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/abstract_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ def cursor(self) -> Cursor:
:return: The cursor associated with this stream.
"""

@property
def block_simultaneous_read(self) -> str:
"""
Override to return a non-empty group name if this stream should block simultaneous reads.
When a non-empty string is returned, prevents starting partition generation for this stream if:
- Another stream with the same group name is already active
- Any of its parent streams are in an active group

This allows grouping multiple streams that share the same resource (e.g., API endpoint or session)
to prevent them from running concurrently, even if they don't have a parent-child relationship.

:return: Group name for blocking (non-empty string), or "" to allow concurrent reading
"""
return "" # Default: allow concurrent reading

@abstractmethod
def check_availability(self) -> StreamAvailability:
"""
Expand Down
5 changes: 5 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ def cursor_field(self) -> Union[str, List[str]]:
def cursor(self) -> Optional[Cursor]: # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor
return self._cursor

@property
def block_simultaneous_read(self) -> str:
"""Returns the blocking group name from the underlying stream"""
return self._abstract_stream.block_simultaneous_read

# FIXME the lru_cache seems to be mostly there because of typing issue
@lru_cache(maxsize=None)
def get_json_schema(self) -> Mapping[str, Any]:
Expand Down
27 changes: 27 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/default_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
from typing import Any, Callable, Iterable, List, Mapping, Optional, Union

from airbyte_cdk.models import AirbyteStream, SyncMode
from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import (
ConcurrentPerPartitionCursor,
)
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
StreamSlicerPartitionGenerator,
)
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField
Expand All @@ -26,6 +33,7 @@ def __init__(
cursor: Cursor,
namespace: Optional[str] = None,
supports_file_transfer: bool = False,
block_simultaneous_read: str = "",
) -> None:
self._stream_partition_generator = partition_generator
self._name = name
Expand All @@ -36,6 +44,7 @@ def __init__(
self._cursor = cursor
self._namespace = namespace
self._supports_file_transfer = supports_file_transfer
self._block_simultaneous_read = block_simultaneous_read

def generate_partitions(self) -> Iterable[Partition]:
yield from self._stream_partition_generator.generate()
Expand Down Expand Up @@ -94,6 +103,24 @@ def log_stream_sync_configuration(self) -> None:
def cursor(self) -> Cursor:
return self._cursor

@property
def block_simultaneous_read(self) -> str:
"""Returns the blocking group name for this stream, or empty string if no blocking"""
return self._block_simultaneous_read

@block_simultaneous_read.setter
def block_simultaneous_read(self, value: str) -> None:
self._block_simultaneous_read = value

def get_partition_router(self) -> PartitionRouter | None:
"""Return the partition router for this stream, or None if not available."""
if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator):
return None
stream_slicer = self._stream_partition_generator._stream_slicer
if not isinstance(stream_slicer, ConcurrentPerPartitionCursor):
return None
return stream_slicer._partition_router

def check_availability(self) -> StreamAvailability:
"""
Check stream availability by attempting to read the first record of the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5281,6 +5281,62 @@ def test_catalog_defined_cursor_field_stream_missing():
assert stream._cursor_field.supports_catalog_defined_cursor_field == True


def test_block_simultaneous_read_from_stream_groups():
"""Test that factory-created streams default to empty block_simultaneous_read.

The factory no longer handles stream_groups — that's done by
ConcurrentDeclarativeSource._apply_stream_groups after stream creation.
This test verifies the factory creates streams without group info.
"""
content = """
definitions:
parent_stream:
type: DeclarativeStream
name: "parent"
primary_key: "id"
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: "https://api.example.com"
path: "/parent"
http_method: "GET"
authenticator:
type: BearerAuthenticator
api_token: "{{ config['api_key'] }}"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: []
schema_loader:
type: InlineSchemaLoader
schema:
type: object
properties:
id:
type: string
"""

config = {"api_key": "test_key"}

parsed_manifest = YamlDeclarativeSource._parse(content)
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)

factory = ModelToComponentFactory()

parent_manifest = transformer.propagate_types_and_parameters(
"", resolved_manifest["definitions"]["parent_stream"], {}
)
parent_stream: DefaultStream = factory.create_component(
model_type=DeclarativeStreamModel, component_definition=parent_manifest, config=config
)

assert isinstance(parent_stream, DefaultStream)
assert parent_stream.name == "parent"
assert parent_stream.block_simultaneous_read == ""


def get_schema_loader(stream: DefaultStream):
assert isinstance(
stream._stream_partition_generator._partition_factory._schema_loader,
Expand Down
Loading
Loading