Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/queue_classic/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ def app_name
def wait_time
@wait_time ||= (ENV['QC_LISTEN_TIME'] || 5).to_i
end

# Whether workers should use PostgreSQL LISTEN/NOTIFY while waiting for jobs.
def listen_notify?
return @listen_notify unless @listen_notify.nil?

@listen_notify = ENV.fetch('QC_LISTEN_NOTIFY', 'true') != 'false'
end

# Why do you want to change the table name?
# Just deal with the default OK?
Expand Down Expand Up @@ -78,6 +85,7 @@ def reset_config
# TODO: we might want to think about storing these in a Hash.
@app_name = nil
@wait_time = nil
@listen_notify = nil
@table_name = nil
@queue = nil
@default_queue = nil
Expand Down
2 changes: 2 additions & 0 deletions lib/queue_classic/conn_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def execute(stmt, *params)
end

def wait(time, *channels)
return Kernel.sleep(time) unless QC.listen_notify?

@mutex.synchronize do
listen_cmds = channels.map { |c| "LISTEN \"#{c}\"" }
connection.exec(listen_cmds.join(';'))
Expand Down
13 changes: 13 additions & 0 deletions test/config_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ def test_configure_wait_time_with_env_var
end
end

def test_listen_notify_default
assert QC.listen_notify?
end

def test_configure_listen_notify_with_env_var
with_env 'QC_LISTEN_NOTIFY' => 'false' do
QC.reset_config
refute QC.listen_notify?
end
end



def test_table_name_default
assert_equal 'queue_classic_jobs', QC.table_name
end
Expand Down
69 changes: 69 additions & 0 deletions test/conn_adapter_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# frozen_string_literal: true

require_relative 'helper'

class ConnAdapterTest < QCTest
class FakeConnection
attr_reader :statements, :wait_time

def initialize
@statements = []
@wait_time = nil
end

def exec(statement)
@statements << statement
[]
end

def wait_for_notify(time)
@wait_time = time
end

def notifies
nil
end
end

def setup
QC.reset_config
end

def teardown
QC.reset_config
end

# This test ensures that the connection adapter
# correctly uses LISTEN/NOTIFY when waiting for
# jobs, and falls back to sleeping when
# LISTEN/NOTIFY is disabled.
def test_wait_uses_listen_notify_by_default
connection = FakeConnection.new
adapter = QC::ConnAdapter.new(connection: connection)

adapter.wait(0, 'default')

assert_includes connection.statements, 'LISTEN "default"'
assert_includes connection.statements, 'UNLISTEN "default"'
assert_equal 0, connection.wait_time
end

# This test ensures that the connection
# adapter falls back to polling when
# LISTEN/NOTIFY is disabled.
def test_wait_uses_polling_sleep_when_listen_notify_is_disabled
connection = FakeConnection.new
adapter = QC::ConnAdapter.new(connection: connection)

with_env 'QC_LISTEN_NOTIFY' => 'false' do
QC.reset_config
adapter.wait(0, 'default')
end

assert_empty connection.statements
assert_nil connection.wait_time
end
end