diff --git a/CHANGELOG.md b/CHANGELOG.md index 825ca3f..4d37db7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,12 @@ # Release Notes -## Latest +## 1.1.1 - 2026-05-14 + +### Bug fixes + +* 🐛 **Consumer group loss after `FLUSHALL`**: `FLUSHALL` at runtime deletes all stream consumer groups. The Coordinator recreates streams via `XADD`, but `xreadgroup` kept throwing `NOGROUP` on every iteration. Catch `NOGROUP` in `_try_read_stream` and call `setup_consumer_groups()` to recover; the Reconciler republishes any tasks queued before the group was restored. + +* 🐛 **False "Task failed" after flush**: When a task finishes while the consumer group is gone, `xack` throws `NOGROUP`. The generic handler in `_execute_and_ack` was catching this, logging "Task failed", and applying backoff on a task that actually succeeded. Catch `NOGROUP` in `_ack_message` and log a warning instead — the message is already gone from the PEL. ## 1.1.0 - 2026-04-15 diff --git a/pyproject.toml b/pyproject.toml index 7dc035f..758ecce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "fastapi-task-manager" -version = "1.1.0" +version = "1.1.1" description = "A task manager for FastAPI. Robust Scheduling, Distributed Safety" readme = "README.md" authors = [ diff --git a/src/fastapi_task_manager/stream_consumer.py b/src/fastapi_task_manager/stream_consumer.py index c05c72d..2f0d6b2 100644 --- a/src/fastapi_task_manager/stream_consumer.py +++ b/src/fastapi_task_manager/stream_consumer.py @@ -288,13 +288,27 @@ async def _try_read_stream( Returns: Tuple of (message_id, data) if a message was read, None otherwise. """ - messages = await self._redis.xreadgroup( - groupname=group_name, - consumername=self._worker.redis_safe_id, - streams={stream_key: ">"}, - count=1, # Read one message at a time - block=block_ms, - ) + try: + messages = await self._redis.xreadgroup( + groupname=group_name, + consumername=self._worker.redis_safe_id, + streams={stream_key: ">"}, + count=1, # Read one message at a time + block=block_ms, + ) + except ResponseError as e: + if "NOGROUP" in str(e): + # Consumer group was deleted (e.g. Redis FLUSHALL). Recreate it so + # the consume loop can resume; the Reconciler will republish any + # tasks that were published to the stream before the group existed. + logger.warning( + "Consumer group '%s' not found on stream '%s' (Redis flush?), recreating groups", + group_name, + stream_key, + ) + await self.setup_consumer_groups() + return None + raise if not messages: return None @@ -546,14 +560,28 @@ async def _ack_message(self, message_id: str, is_high_priority: bool) -> None: stream_keys = self._keys.get_stream_keys() stream_key = stream_keys.task_stream_high if is_high_priority else stream_keys.task_stream_low - await self._redis.xack( - stream_key, - stream_keys.consumer_group, - message_id, - ) + try: + await self._redis.xack( + stream_key, + stream_keys.consumer_group, + message_id, + ) - # Remove the message from the stream to keep memory usage low - await self._redis.xdel(stream_key, message_id) + # Remove the message from the stream to keep memory usage low + await self._redis.xdel(stream_key, message_id) + except ResponseError as e: + if "NOGROUP" in str(e): + # Consumer group no longer exists (e.g. Redis FLUSHALL); the + # message is already gone from the PEL so there is nothing to + # acknowledge. Swallow the error to avoid masking a successful + # task execution as a failure. + logger.warning( + "Could not ACK message %s: consumer group '%s' not found (Redis flush?)", + message_id, + stream_keys.consumer_group, + ) + else: + raise def _find_task(self, group_name: str, task_name: str) -> Task | None: """Find a task by group and task name. diff --git a/uv.lock b/uv.lock index 7fc504d..183b7a4 100644 --- a/uv.lock +++ b/uv.lock @@ -379,7 +379,7 @@ wheels = [ [[package]] name = "fastapi-task-manager" -version = "1.1.0" +version = "1.1.1" source = { editable = "." } dependencies = [ { name = "cronsim" },