When I try to unsubscribe one queue, and re-subscribe it
Exq automatically re-enqueue jobs from backup for currently running processes in that queue.
So I get duplicated processes as result if i check through Exq.Api.process(Exq.Api).
excerpt from Exq.Manager.Server
defp add_queue(state, queue, concurrency \\ Config.get(:concurrency)) do
...
GenServer.cast(self(), {:re_enqueue_backup, queue})
...
end
I guess it is because Exq assumes subscribe() is called to recover from errors.
my usecase is to control the flow depending on the situations.
here are some snippets to reproduce
config :exq,
name: Exq,
...
queues: [{"tick", 5}],
max_retries: 1
def Example.TickWorker do
def perform() do
tick(60_000)
end
defp tick(t) do
cond do
t > 0 ->
IO.puts "tick: #{t}"
:timer.sleep(5_000)
tick(t - 5_000)
true ->
IO.puts "-- tick end"
end
end
end
and here are my console
> Exq.enqueue(Exq, "tick", Example.TickWorker, [])
{:ok, "bd29a6f3-92c2-46dc-96b8-52362a8ad808"}
tick: 60000
tick: 55000
> Exq.unsubscribe(Exq, "tick")
:ok
tick: 50000
> Exq.subscribe(Exq, "tick")
:ok
[info] Re-enqueueing job from backup for node_id [Username-MBP] and queue [tick]
tick: 60000
tick: 45000
tick: 55000
tick: 40000
...
thank you.
When I try to unsubscribe one queue, and re-subscribe it
Exq automatically re-enqueue jobs from backup for currently running processes in that queue.
So I get duplicated processes as result if i check through
Exq.Api.process(Exq.Api).excerpt from Exq.Manager.Server
I guess it is because Exq assumes subscribe() is called to recover from errors.
my usecase is to control the flow depending on the situations.
here are some snippets to reproduce
and here are my console
thank you.