Bug: race condition in RedisStreamBroker.listen causes duplicate task execution via xautoclaim
Environment
- taskiq-redis: 1.0.9
- Python: 3.11
- Redis: 7.2.5
Description
The autoclaim section in listen() uses a two-step lock check that is not atomic, which allows multiple workers to call XAUTOCLAIM concurrently on the same pending message(which shouldn't)
# Following is not atomic
if await lock.locked():
continue
async with lock:
pending = await redis_conn.xautoclaim(
Summary
Expected behavior:
Concurrent XAUTOCLAIM calls should not result in the same message being yielded to multiple workers simultaneously.
Actual behavior:
Multiple workers can execute XAUTOCLAIM concurrently, leading to duplicate task execution.
This increases the probability of duplicate task execution beyond the natural at-least-once guarantees of Redis Streams.
Debugging
Look how it was caught, it was literally same moment. I masked request_id, but they are same. And in our system this has happened more than once in minute, not rare case
[2026-04-08 17:16:00,342][root][INFO ][worker-1][d2161072-00d2-4923-b7f9-5ed2e7be32d5] Starting taskiq task with request_id = 'X'
[2026-04-08 17:16:00,342][root][INFO ][worker-0][0a4bc39a-6594-45e4-b0d0-f7ef7b4f199c] Starting taskiq task with request_id = 'X'
Reproduction
And here is an example around this case to show how it can be done:
import asyncio
import redis.asyncio as aioredis
STREAM, GROUP = "taskiq_bug", "taskiq"
async def setup():
r = await aioredis.from_url("redis://localhost:6379")
await r.delete(STREAM)
await r.delete(f"autoclaim:{GROUP}:{STREAM}")
await r.xgroup_create(STREAM, GROUP, id="0", mkstream=True)
msg_id = await r.xadd(STREAM, {"data": b"test"})
await r.xreadgroup(GROUP, "worker-seed", {STREAM: ">"}, count=1)
await r.aclose()
return msg_id
async def worker(name: str):
conn = await aioredis.from_url("redis://localhost:6379")
lock = conn.lock(f"autoclaim:{GROUP}:{STREAM}")
if await lock.locked(): #HERE, window for RC is opening
await conn.aclose()
return
async with lock: #HERE, another worker can enter simultaneously
result = await conn.xautoclaim(
STREAM,
GROUP,
name,
min_idle_time=100,
count=10,
)
if result[1]:
print(f"!!! {name} claimed: {result[1]}")
await conn.aclose()
async def run_once(i: int):
print(f"\n--- RUN {i} ---")
await setup()
await asyncio.sleep(0.2)
await asyncio.gather(
worker("worker-0"),
worker("worker-1"),
)
async def main():
for i in range(25):
await run_once(i)
await asyncio.sleep(0.1)
if __name__ == "__main__":
asyncio.run(main())
In ouput you will see that different workers are running the same tasks
Proposal fix
Replace the two-step check+acquire with an atomic SET NX PX:
lock_key = f"autoclaim:{self.consumer_group_name}:{stream}"
lock_value = f"{self.consumer_name}-{uuid.uuid4()}"
acquired = await redis_conn.set(lock_key, lock_value, nx=True, px=30_000)
if not acquired:
continue
try:
pending = await redis_conn.xautoclaim(...)
for msg_id, msg in pending[1]:
yield AckableMessage(...)
finally:
await redis_conn.eval(
"""
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
""",
1, lock_key, lock_value,
)
I'd like to discuss and prepare PR after this
Bug: race condition in
RedisStreamBroker.listencauses duplicate task execution viaxautoclaimEnvironment
Description
The autoclaim section in
listen()uses a two-step lock check that is not atomic, which allows multiple workers to callXAUTOCLAIMconcurrently on the same pending message(which shouldn't)Summary
Expected behavior:
Concurrent XAUTOCLAIM calls should not result in the same message being yielded to multiple workers simultaneously.
Actual behavior:
Multiple workers can execute XAUTOCLAIM concurrently, leading to duplicate task execution.
This increases the probability of duplicate task execution beyond the natural at-least-once guarantees of Redis Streams.
Debugging
Look how it was caught, it was literally same moment. I masked request_id, but they are same. And in our system this has happened more than once in minute, not rare case
Reproduction
And here is an example around this case to show how it can be done:
In ouput you will see that different workers are running the same tasks
Proposal fix
Replace the two-step check+acquire with an atomic
SET NX PX:I'd like to discuss and prepare PR after this