From a13be8ad3a0077e271d1fe14044a8b568c04f8d3 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 12 Mar 2026 16:40:41 +0000 Subject: [PATCH 1/3] feat: increase DEFAULT_MAX_QUEUE_SIZE from 10,000 to 50,000 Co-Authored-By: unknown <> --- airbyte_cdk/sources/concurrent_source/thread_pool_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py b/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py index 59f8a1f0b..e0e34d814 100644 --- a/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py +++ b/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py @@ -12,7 +12,7 @@ class ThreadPoolManager: Wrapper to abstract away the threadpool and the logic to wait for pending tasks to be completed. """ - DEFAULT_MAX_QUEUE_SIZE = 10_000 + DEFAULT_MAX_QUEUE_SIZE = 50_000 def __init__( self, From 9d703625fb43e3037a0c2c85fa6207ad3145a165 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 12 Mar 2026 18:20:44 +0000 Subject: [PATCH 2/3] feat: make record queue unlimited (maxsize=0) to prevent deadlock Co-Authored-By: unknown <> --- airbyte_cdk/sources/concurrent_source/concurrent_source.py | 2 +- airbyte_cdk/sources/concurrent_source/thread_pool_manager.py | 2 +- .../sources/declarative/concurrent_declarative_source.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_source.py b/airbyte_cdk/sources/concurrent_source/concurrent_source.py index de2d93523..ea020a14d 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_source.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_source.py @@ -99,7 +99,7 @@ def __init__( # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more # information and might even need to be configurable depending on the source - self._queue = queue or Queue(maxsize=10_000) + self._queue = queue or Queue(maxsize=0) def read( self, diff --git a/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py b/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py index e0e34d814..03c3e9b08 100644 --- a/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py +++ b/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py @@ -12,7 +12,7 @@ class ThreadPoolManager: Wrapper to abstract away the threadpool and the logic to wait for pending tasks to be completed. """ - DEFAULT_MAX_QUEUE_SIZE = 50_000 + DEFAULT_MAX_QUEUE_SIZE = 0 # 0 means unlimited def __init__( self, diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 45fe6aa2d..c769235dd 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -157,7 +157,7 @@ def __init__( # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more # information and might even need to be configurable depending on the source - queue: Queue[QueueItem] = Queue(maxsize=10_000) + queue: Queue[QueueItem] = Queue(maxsize=0) message_repository = InMemoryMessageRepository( Level.DEBUG if emit_connector_builder_messages else Level.INFO ) From cb93ee44fa95403f30f97ae98f8ef6cacd7c33f3 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 12 Mar 2026 19:02:37 +0000 Subject: [PATCH 3/3] fix: restore DEFAULT_MAX_QUEUE_SIZE to 10_000 for max_concurrent_tasks default Setting DEFAULT_MAX_QUEUE_SIZE=0 broke ThreadPoolManager because it is also used as the default for max_concurrent_tasks. With max_concurrent_tasks=0, prune_to_validate_has_reached_futures_limit() always returns True (len >= 0), causing infinite warning spam and preventing any records from being processed. Fix: keep DEFAULT_MAX_QUEUE_SIZE=10_000 (used for futures limit) and only change the Queue(maxsize=0) in concurrent_source.py and concurrent_declarative_source.py (unlimited queue to prevent deadlock). Co-Authored-By: unknown <> --- airbyte_cdk/sources/concurrent_source/thread_pool_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py b/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py index 03c3e9b08..59f8a1f0b 100644 --- a/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py +++ b/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py @@ -12,7 +12,7 @@ class ThreadPoolManager: Wrapper to abstract away the threadpool and the logic to wait for pending tasks to be completed. """ - DEFAULT_MAX_QUEUE_SIZE = 0 # 0 means unlimited + DEFAULT_MAX_QUEUE_SIZE = 10_000 def __init__( self,