Skip to content

Exq.subscribe() causes unnecessary re-enqueues #310

@robobakery

Description

@robobakery

When I try to unsubscribe one queue, and re-subscribe it
Exq automatically re-enqueue jobs from backup for currently running processes in that queue.

So I get duplicated processes as result if i check through Exq.Api.process(Exq.Api).

excerpt from Exq.Manager.Server

  defp add_queue(state, queue, concurrency \\ Config.get(:concurrency)) do
    ...
    GenServer.cast(self(), {:re_enqueue_backup, queue})
    ...
  end

I guess it is because Exq assumes subscribe() is called to recover from errors.
my usecase is to control the flow depending on the situations.

here are some snippets to reproduce

config :exq,
  name: Exq,
  ...
  queues: [{"tick", 5}],
  max_retries: 1
def Example.TickWorker do
  def perform() do
    tick(60_000)
  end

  defp tick(t) do
    cond do
      t > 0 ->
        IO.puts "tick: #{t}"
        :timer.sleep(5_000)
        tick(t - 5_000)
      true ->
        IO.puts "-- tick end"
    end
  end
end

and here are my console

> Exq.enqueue(Exq, "tick", Example.TickWorker, [])
{:ok, "bd29a6f3-92c2-46dc-96b8-52362a8ad808"}
tick: 60000
tick: 55000
> Exq.unsubscribe(Exq, "tick")
:ok
tick: 50000
> Exq.subscribe(Exq, "tick")
:ok
[info] Re-enqueueing job from backup for node_id [Username-MBP] and queue [tick]
tick: 60000
tick: 45000
tick: 55000
tick: 40000
...

thank you.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions