diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_source.py b/airbyte_cdk/sources/concurrent_source/concurrent_source.py index de2d93523..ea020a14d 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_source.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_source.py @@ -99,7 +99,7 @@ def __init__( # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more # information and might even need to be configurable depending on the source - self._queue = queue or Queue(maxsize=10_000) + self._queue = queue or Queue(maxsize=0) def read( self, diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 45fe6aa2d..c769235dd 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -157,7 +157,7 @@ def __init__( # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more # information and might even need to be configurable depending on the source - queue: Queue[QueueItem] = Queue(maxsize=10_000) + queue: Queue[QueueItem] = Queue(maxsize=0) message_repository = InMemoryMessageRepository( Level.DEBUG if emit_connector_builder_messages else Level.INFO )