From 75176f54af175fcf70015870f46045b0d4215ce9 Mon Sep 17 00:00:00 2001 From: Matteo Morando Date: Wed, 13 May 2026 17:34:09 +0200 Subject: [PATCH 1/5] fix: recover consumer groups after Redis FLUSHALL When Redis is flushed at runtime, stream consumer groups are deleted. xreadgroup then throws NOGROUP on every iteration, causing the consumer loop to spin indefinitely without processing any tasks. Catch NOGROUP in _try_read_stream and call setup_consumer_groups() to recreate the groups immediately. The Reconciler then republishes any tasks that were enqueued before the groups were restored. Co-Authored-By: Claude Sonnet 4.6 --- src/fastapi_task_manager/stream_consumer.py | 28 +++++++++++++++------ 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/fastapi_task_manager/stream_consumer.py b/src/fastapi_task_manager/stream_consumer.py index c05c72d..f8c0c52 100644 --- a/src/fastapi_task_manager/stream_consumer.py +++ b/src/fastapi_task_manager/stream_consumer.py @@ -288,13 +288,27 @@ async def _try_read_stream( Returns: Tuple of (message_id, data) if a message was read, None otherwise. """ - messages = await self._redis.xreadgroup( - groupname=group_name, - consumername=self._worker.redis_safe_id, - streams={stream_key: ">"}, - count=1, # Read one message at a time - block=block_ms, - ) + try: + messages = await self._redis.xreadgroup( + groupname=group_name, + consumername=self._worker.redis_safe_id, + streams={stream_key: ">"}, + count=1, # Read one message at a time + block=block_ms, + ) + except ResponseError as e: + if "NOGROUP" in str(e): + # Consumer group was deleted (e.g. Redis FLUSHALL). Recreate it so + # the consume loop can resume; the Reconciler will republish any + # tasks that were published to the stream before the group existed. + logger.warning( + "Consumer group '%s' not found on stream '%s' (Redis flush?), recreating groups", + group_name, + stream_key, + ) + await self.setup_consumer_groups() + return None + raise if not messages: return None From 1f7adc741a2adb15e676fd63fda3cb132a8ef36b Mon Sep 17 00:00:00 2001 From: Matteo Morando Date: Wed, 13 May 2026 17:36:17 +0200 Subject: [PATCH 2/5] fix: handle NOGROUP in _ack_message after Redis flush If a task completes mid-execution while the consumer group has been deleted (e.g. Redis FLUSHALL), xack throws NOGROUP. This caused the success-path exception handler to log a false "Task failed", apply incorrect backoff, and surface an unhandled asyncio exception. After a flush the message is already gone from the PEL, so there is nothing to acknowledge. Catch NOGROUP in _ack_message and log a warning instead of propagating the error. Co-Authored-By: Claude Sonnet 4.6 --- src/fastapi_task_manager/stream_consumer.py | 28 +++++++++++++++------ 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/fastapi_task_manager/stream_consumer.py b/src/fastapi_task_manager/stream_consumer.py index f8c0c52..2f0d6b2 100644 --- a/src/fastapi_task_manager/stream_consumer.py +++ b/src/fastapi_task_manager/stream_consumer.py @@ -560,14 +560,28 @@ async def _ack_message(self, message_id: str, is_high_priority: bool) -> None: stream_keys = self._keys.get_stream_keys() stream_key = stream_keys.task_stream_high if is_high_priority else stream_keys.task_stream_low - await self._redis.xack( - stream_key, - stream_keys.consumer_group, - message_id, - ) + try: + await self._redis.xack( + stream_key, + stream_keys.consumer_group, + message_id, + ) - # Remove the message from the stream to keep memory usage low - await self._redis.xdel(stream_key, message_id) + # Remove the message from the stream to keep memory usage low + await self._redis.xdel(stream_key, message_id) + except ResponseError as e: + if "NOGROUP" in str(e): + # Consumer group no longer exists (e.g. Redis FLUSHALL); the + # message is already gone from the PEL so there is nothing to + # acknowledge. Swallow the error to avoid masking a successful + # task execution as a failure. + logger.warning( + "Could not ACK message %s: consumer group '%s' not found (Redis flush?)", + message_id, + stream_keys.consumer_group, + ) + else: + raise def _find_task(self, group_name: str, task_name: str) -> Task | None: """Find a task by group and task name. From 9627ff3443fff29f0f7623578ef4279941400a76 Mon Sep 17 00:00:00 2001 From: Matteo Morando Date: Wed, 13 May 2026 17:40:59 +0200 Subject: [PATCH 3/5] docs: update changelog with FLUSHALL bug fixes Co-Authored-By: Claude Sonnet 4.6 --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 825ca3f..ac859bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ ## Latest +### Bug fixes + +* 🐛 **Consumer group loss after `FLUSHALL`**: `FLUSHALL` at runtime deletes all stream consumer groups. The Coordinator recreates streams via `XADD`, but `xreadgroup` kept throwing `NOGROUP` on every iteration. Catch `NOGROUP` in `_try_read_stream` and call `setup_consumer_groups()` to recover; the Reconciler republishes any tasks queued before the group was restored. + +* 🐛 **False "Task failed" after flush**: When a task finishes while the consumer group is gone, `xack` throws `NOGROUP`. The generic handler in `_execute_and_ack` was catching this, logging "Task failed", and applying backoff on a task that actually succeeded. Catch `NOGROUP` in `_ack_message` and log a warning instead — the message is already gone from the PEL. + ## 1.1.0 - 2026-04-15 ### Features From 8caccbfc510f1ffcebb640db636db1f8d897ab13 Mon Sep 17 00:00:00 2001 From: Matteo Morando Date: Wed, 13 May 2026 18:17:10 +0200 Subject: [PATCH 4/5] chore: bump version to 1.1.1a1 Co-Authored-By: Claude Sonnet 4.6 --- pyproject.toml | 2 +- uv.lock | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 7dc035f..c8a64d1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "fastapi-task-manager" -version = "1.1.0" +version = "1.1.1a1" description = "A task manager for FastAPI. Robust Scheduling, Distributed Safety" readme = "README.md" authors = [ diff --git a/uv.lock b/uv.lock index 7fc504d..555d134 100644 --- a/uv.lock +++ b/uv.lock @@ -379,7 +379,7 @@ wheels = [ [[package]] name = "fastapi-task-manager" -version = "1.1.0" +version = "1.1.1a1" source = { editable = "." } dependencies = [ { name = "cronsim" }, From d2b24b65497fb301372c7440aabec64619d07b0a Mon Sep 17 00:00:00 2001 From: Matteo Morando Date: Thu, 14 May 2026 11:31:26 +0200 Subject: [PATCH 5/5] bump version --- CHANGELOG.md | 2 +- pyproject.toml | 2 +- uv.lock | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ac859bf..4d37db7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Release Notes -## Latest +## 1.1.1 - 2026-05-14 ### Bug fixes diff --git a/pyproject.toml b/pyproject.toml index c8a64d1..758ecce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "fastapi-task-manager" -version = "1.1.1a1" +version = "1.1.1" description = "A task manager for FastAPI. Robust Scheduling, Distributed Safety" readme = "README.md" authors = [ diff --git a/uv.lock b/uv.lock index 555d134..183b7a4 100644 --- a/uv.lock +++ b/uv.lock @@ -379,7 +379,7 @@ wheels = [ [[package]] name = "fastapi-task-manager" -version = "1.1.1a1" +version = "1.1.1" source = { editable = "." } dependencies = [ { name = "cronsim" },