diff --git a/lib/queue_classic/config.rb b/lib/queue_classic/config.rb index 68b6c2a..1bbe8f4 100644 --- a/lib/queue_classic/config.rb +++ b/lib/queue_classic/config.rb @@ -14,6 +14,13 @@ def app_name def wait_time @wait_time ||= (ENV['QC_LISTEN_TIME'] || 5).to_i end + + # Whether workers should use PostgreSQL LISTEN/NOTIFY while waiting for jobs. + def listen_notify? + return @listen_notify unless @listen_notify.nil? + + @listen_notify = ENV.fetch('QC_LISTEN_NOTIFY', 'true') != 'false' + end # Why do you want to change the table name? # Just deal with the default OK? @@ -78,6 +85,7 @@ def reset_config # TODO: we might want to think about storing these in a Hash. @app_name = nil @wait_time = nil + @listen_notify = nil @table_name = nil @queue = nil @default_queue = nil diff --git a/lib/queue_classic/conn_adapter.rb b/lib/queue_classic/conn_adapter.rb index 422b5e7..70a1551 100644 --- a/lib/queue_classic/conn_adapter.rb +++ b/lib/queue_classic/conn_adapter.rb @@ -38,6 +38,8 @@ def execute(stmt, *params) end def wait(time, *channels) + return Kernel.sleep(time) unless QC.listen_notify? + @mutex.synchronize do listen_cmds = channels.map { |c| "LISTEN \"#{c}\"" } connection.exec(listen_cmds.join(';')) diff --git a/test/config_test.rb b/test/config_test.rb index 40babcf..5807034 100644 --- a/test/config_test.rb +++ b/test/config_test.rb @@ -31,6 +31,19 @@ def test_configure_wait_time_with_env_var end end + def test_listen_notify_default + assert QC.listen_notify? + end + + def test_configure_listen_notify_with_env_var + with_env 'QC_LISTEN_NOTIFY' => 'false' do + QC.reset_config + refute QC.listen_notify? + end + end + + + def test_table_name_default assert_equal 'queue_classic_jobs', QC.table_name end diff --git a/test/conn_adapter_test.rb b/test/conn_adapter_test.rb new file mode 100644 index 0000000..02fdf55 --- /dev/null +++ b/test/conn_adapter_test.rb @@ -0,0 +1,69 @@ +# frozen_string_literal: true + +require_relative 'helper' + +class ConnAdapterTest < QCTest + class FakeConnection + attr_reader :statements, :wait_time + + def initialize + @statements = [] + @wait_time = nil + end + + def exec(statement) + @statements << statement + [] + end + + def wait_for_notify(time) + @wait_time = time + end + + def notifies + nil + end + end + + def setup + QC.reset_config + end + + def teardown + QC.reset_config + end + + # This test ensures that the connection adapter + # correctly uses LISTEN/NOTIFY when waiting for + # jobs, and falls back to sleeping when + # LISTEN/NOTIFY is disabled. + def test_wait_uses_listen_notify_by_default + connection = FakeConnection.new + adapter = QC::ConnAdapter.new(connection: connection) + + adapter.wait(0, 'default') + + assert_includes connection.statements, 'LISTEN "default"' + assert_includes connection.statements, 'UNLISTEN "default"' + assert_equal 0, connection.wait_time + end + + # This test ensures that the connection + # adapter falls back to polling when + # LISTEN/NOTIFY is disabled. + def test_wait_uses_polling_sleep_when_listen_notify_is_disabled + connection = FakeConnection.new + adapter = QC::ConnAdapter.new(connection: connection) + + with_env 'QC_LISTEN_NOTIFY' => 'false' do + QC.reset_config + adapter.wait(0, 'default') + end + + assert_empty connection.statements + assert_nil connection.wait_time + end +end + + +