Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Release Notes

## Latest
## 1.1.1 - 2026-05-14

### Bug fixes

* 🐛 **Consumer group loss after `FLUSHALL`**: `FLUSHALL` at runtime deletes all stream consumer groups. The Coordinator recreates streams via `XADD`, but `xreadgroup` kept throwing `NOGROUP` on every iteration. Catch `NOGROUP` in `_try_read_stream` and call `setup_consumer_groups()` to recover; the Reconciler republishes any tasks queued before the group was restored.

* 🐛 **False "Task failed" after flush**: When a task finishes while the consumer group is gone, `xack` throws `NOGROUP`. The generic handler in `_execute_and_ack` was catching this, logging "Task failed", and applying backoff on a task that actually succeeded. Catch `NOGROUP` in `_ack_message` and log a warning instead — the message is already gone from the PEL.

## 1.1.0 - 2026-04-15

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "fastapi-task-manager"
version = "1.1.0"
version = "1.1.1"
description = "A task manager for FastAPI. Robust Scheduling, Distributed Safety"
readme = "README.md"
authors = [
Expand Down
56 changes: 42 additions & 14 deletions src/fastapi_task_manager/stream_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,27 @@ async def _try_read_stream(
Returns:
Tuple of (message_id, data) if a message was read, None otherwise.
"""
messages = await self._redis.xreadgroup(
groupname=group_name,
consumername=self._worker.redis_safe_id,
streams={stream_key: ">"},
count=1, # Read one message at a time
block=block_ms,
)
try:
messages = await self._redis.xreadgroup(
groupname=group_name,
consumername=self._worker.redis_safe_id,
streams={stream_key: ">"},
count=1, # Read one message at a time
block=block_ms,
)
except ResponseError as e:
if "NOGROUP" in str(e):
# Consumer group was deleted (e.g. Redis FLUSHALL). Recreate it so
# the consume loop can resume; the Reconciler will republish any
# tasks that were published to the stream before the group existed.
logger.warning(
"Consumer group '%s' not found on stream '%s' (Redis flush?), recreating groups",
group_name,
stream_key,
)
await self.setup_consumer_groups()
return None
raise

if not messages:
return None
Expand Down Expand Up @@ -546,14 +560,28 @@ async def _ack_message(self, message_id: str, is_high_priority: bool) -> None:
stream_keys = self._keys.get_stream_keys()
stream_key = stream_keys.task_stream_high if is_high_priority else stream_keys.task_stream_low

await self._redis.xack(
stream_key,
stream_keys.consumer_group,
message_id,
)
try:
await self._redis.xack(
stream_key,
stream_keys.consumer_group,
message_id,
)

# Remove the message from the stream to keep memory usage low
await self._redis.xdel(stream_key, message_id)
# Remove the message from the stream to keep memory usage low
await self._redis.xdel(stream_key, message_id)
except ResponseError as e:
if "NOGROUP" in str(e):
# Consumer group no longer exists (e.g. Redis FLUSHALL); the
# message is already gone from the PEL so there is nothing to
# acknowledge. Swallow the error to avoid masking a successful
# task execution as a failure.
logger.warning(
"Could not ACK message %s: consumer group '%s' not found (Redis flush?)",
message_id,
stream_keys.consumer_group,
)
else:
raise

def _find_task(self, group_name: str, task_name: str) -> Task | None:
"""Find a task by group and task name.
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading