Skip to content

[BUG] Race condition inside RedisStreamBroker #123

@AivazianArtur

Description

@AivazianArtur

Bug: race condition in RedisStreamBroker.listen causes duplicate task execution via xautoclaim

Environment

  • taskiq-redis: 1.0.9
  • Python: 3.11
  • Redis: 7.2.5

Description

The autoclaim section in listen() uses a two-step lock check that is not atomic, which allows multiple workers to call XAUTOCLAIM concurrently on the same pending message(which shouldn't)

# Following is not atomic
if await lock.locked():
    continue
async with lock:
    pending = await redis_conn.xautoclaim(

Summary

Expected behavior:
Concurrent XAUTOCLAIM calls should not result in the same message being yielded to multiple workers simultaneously.

Actual behavior:
Multiple workers can execute XAUTOCLAIM concurrently, leading to duplicate task execution.

This increases the probability of duplicate task execution beyond the natural at-least-once guarantees of Redis Streams.

Debugging

Look how it was caught, it was literally same moment. I masked request_id, but they are same. And in our system this has happened more than once in minute, not rare case

[2026-04-08 17:16:00,342][root][INFO   ][worker-1][d2161072-00d2-4923-b7f9-5ed2e7be32d5] Starting taskiq task with request_id = 'X'
[2026-04-08 17:16:00,342][root][INFO   ][worker-0][0a4bc39a-6594-45e4-b0d0-f7ef7b4f199c] Starting taskiq task with request_id = 'X'

Reproduction

And here is an example around this case to show how it can be done:

import asyncio
import redis.asyncio as aioredis

STREAM, GROUP = "taskiq_bug", "taskiq"


async def setup():
    r = await aioredis.from_url("redis://localhost:6379")

    await r.delete(STREAM)
    await r.delete(f"autoclaim:{GROUP}:{STREAM}")

    await r.xgroup_create(STREAM, GROUP, id="0", mkstream=True)
    msg_id = await r.xadd(STREAM, {"data": b"test"})

    await r.xreadgroup(GROUP, "worker-seed", {STREAM: ">"}, count=1)

    await r.aclose()
    return msg_id


async def worker(name: str):
    conn = await aioredis.from_url("redis://localhost:6379")

    lock = conn.lock(f"autoclaim:{GROUP}:{STREAM}")

    if await lock.locked():  #HERE, window for RC is opening 
        await conn.aclose()
        return

    async with lock:  #HERE, another worker can enter simultaneously
        result = await conn.xautoclaim(
            STREAM,
            GROUP,
            name,
            min_idle_time=100,
            count=10,
        )
        if result[1]:
            print(f"!!! {name} claimed: {result[1]}")

    await conn.aclose()


async def run_once(i: int):
    print(f"\n--- RUN {i} ---")

    await setup()
    await asyncio.sleep(0.2)

    await asyncio.gather(
        worker("worker-0"),
        worker("worker-1"),
    )


async def main():
    for i in range(25):
        await run_once(i)
        await asyncio.sleep(0.1) 


if __name__ == "__main__":
    asyncio.run(main())

In ouput you will see that different workers are running the same tasks

Proposal fix

Replace the two-step check+acquire with an atomic SET NX PX:

lock_key   = f"autoclaim:{self.consumer_group_name}:{stream}"
lock_value = f"{self.consumer_name}-{uuid.uuid4()}"

acquired = await redis_conn.set(lock_key, lock_value, nx=True, px=30_000)
if not acquired:
    continue

try:
    pending = await redis_conn.xautoclaim(...)
    for msg_id, msg in pending[1]:
        yield AckableMessage(...)
finally:
    await redis_conn.eval(
        """
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
        """,
        1, lock_key, lock_value,
    )

I'd like to discuss and prepare PR after this

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions