Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Stream
##
# @private exactly_once_delivery_enabled.
attr_reader :exactly_once_delivery_enabled
attr_accessor :keepalive_interval, :pong_deadline

##
# @private Create an empty Subscriber::Stream object.
Expand All @@ -68,24 +69,58 @@ def initialize subscriber

@callback_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.callback_threads

@keepalive_interval = Float(ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] || 30)
@pong_deadline = Float(ENV["PUBSUB_TEST_PONG_DEADLINE"] || 15)
@last_ping_at = nil
@last_pong_at = nil
@stream_opened = false
@reconnect_delay = nil

@stream_keepalive_task = Concurrent::TimerTask.new(
execution_interval: 30
execution_interval: @keepalive_interval
) do
# push empty request every 30 seconds to keep stream alive
unless inventory.empty?
subscriber.service.logger.log :info, "subscriber-streams" do
"sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
synchronize do
# @request_queue feeds client requests (initial pull request and keep-alive pings) into gRPC.
# Note: ACKs are sent via unary RPCs (TimedUnaryBuffer), not over this stream.
# Check that @request_queue is initialized (not nil) before pushing unconditional keep-alive pings.
if @stream_opened && !@stopped && @request_queue
Comment thread
torreypayne marked this conversation as resolved.
subscriber.service.logger.log :info, "subscriber-streams" do
"sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
end
@last_ping_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @last_pong_at >= @last_ping_at
push Google::Cloud::PubSub::V1::StreamingPullRequest.new
end
push Google::Cloud::PubSub::V1::StreamingPullRequest.new
end
end.execute
end

@pong_monitor_task = Concurrent::TimerTask.new(
execution_interval: [@keepalive_interval / 5.0, 0.01].max
) do
synchronize do
# Do not check pong deadline if @paused (client flow control inventory full).
# When @paused, background_run waits on condition variable and stops calling enum.next,
# so incoming server pongs sit buffered in gRPC and @last_pong_at stays un-updated.
if @stream_opened && @last_ping_at && @last_pong_at && !@stopped && !@paused
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
if now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at
subscriber.service.logger.log :error, "subscriber-streams" do
"Keep-alive pong not received within #{@pong_deadline}s; restarting stream."
end
@stream_opened = false
@background_thread&.raise RestartStream
end
end
end
end
end

def start
synchronize do
break if @background_thread

@inventory.start
@stream_keepalive_task.execute
@pong_monitor_task.execute

start_streaming!
end
Expand All @@ -108,6 +143,9 @@ def stop
@stopped = true
@pause_cond.broadcast

@stream_keepalive_task.shutdown
@pong_monitor_task.shutdown

# Now that the reception thread is stopped, immediately stop the
# callback thread pool. All queued callbacks will see the stream
# is stopped and perform a noop.
Expand Down Expand Up @@ -219,6 +257,13 @@ class RestartStream < StandardError; end

# rubocop:disable all

def backoff_and_wait!
@reconnect_delay = @reconnect_delay ? [@reconnect_delay * 1.5, 60.0].min : 1.0
synchronize do
@pause_cond.wait(@reconnect_delay + rand(0.0..0.5)) unless @stopped
end
end

def background_run
synchronize do
# Don't allow a stream to restart if already stopped
Expand All @@ -245,11 +290,21 @@ def background_run

# Call the StreamingPull API to get the response enumerator
options = { :"metadata" => { :"x-goog-request-params" => @subscriber.subscription_name } }
synchronize do
@stream_opened = false
end
enum = @subscriber.service.streaming_pull @request_queue.each, options
subscriber.service.logger.log :info, "subscriber-streams" do
"rpc: streamingPull, subscription: #{@subscriber.subscription_name}, stream opened"
end

synchronize do
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@last_ping_at = now
@last_pong_at = now
@stream_opened = true
end

loop do
synchronize do
if @paused && !@stopped
Expand All @@ -264,8 +319,17 @@ def background_run
begin
# Cannot synchronize the enumerator, causes deadlock
response = enum.next
new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled
synchronize do
@last_pong_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
# Reset backoff delay only after successfully reading a frame from enum.next.
# If the connection drops immediately upon reading, @reconnect_delay is preserved.
@reconnect_delay = nil
end
received_messages = response.received_messages
# Skip processing properties and inventory on Pong frames (empty received_messages).
# Subscription properties on keep-alive Pongs are not valid.
next if received_messages.empty?
new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled

# Use synchronize so changes happen atomically
synchronize do
Expand Down Expand Up @@ -310,18 +374,21 @@ def background_run
"#{status_code}; will be retried."
end
# Restart the stream with an incremental back for a retriable error.
backoff_and_wait!
retry
rescue RestartStream
subscriber.service.logger.log :info, "subscriber-streams" do
"Subscriber stream for subscription #{@subscriber.subscription_name} has ended; will be retried."
end
backoff_and_wait!
retry
rescue StandardError => e
subscriber.service.logger.log :error, "subscriber-streams" do
"error on stream for subscription #{@subscriber.subscription_name}: #{e.inspect}"
end
@subscriber.error! e

backoff_and_wait!
retry
end

Expand Down Expand Up @@ -443,6 +510,7 @@ def initial_input_request
req.client_id = @subscriber.service.client_id
req.max_outstanding_messages = @inventory.limit
req.max_outstanding_bytes = @inventory.bytesize
req.protocol_version = 1
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down Expand Up @@ -132,7 +133,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down Expand Up @@ -141,7 +142,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require "helper"

describe Google::Cloud::PubSub::MessageListener, :keepalive, :mock_pubsub do
let(:topic_name) { "topic-name-goes-here" }
let(:sub_name) { "subscription-name-goes-here" }
let(:sub_hash) { subscription_hash topic_name, sub_name }
let(:sub_grpc) { Google::Cloud::PubSub::V1::Subscription.new(sub_hash) }
let(:subscriber) { Google::Cloud::PubSub::Subscriber.from_grpc sub_grpc, pubsub.service }
let(:rec_msg1_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \
rec_message_hash("rec_message1-msg-goes-here", 1111) }

before do
ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] = "0.05"
ENV["PUBSUB_TEST_PONG_DEADLINE"] = "0.05"
end

after do
ENV.delete "PUBSUB_TEST_KEEPALIVE_INTERVAL"
ENV.delete "PUBSUB_TEST_PONG_DEADLINE"
end

it "sends protocol_version = 1 in initial streaming pull request" do
pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
stub = StreamingPullStub.new [[pull_res1]]
subscriber.service.mocked_subscription_admin = stub

called = false
listener = subscriber.listen streams: 1 do |msg|
called = true
end
listener.start

listener_retries = 0
until called
fail "callback was not called" if listener_retries > 100
listener_retries += 1
sleep 0.01
end

listener.stop
listener.wait!

initial_req = stub.requests.first.to_a.first
_(initial_req.protocol_version).must_equal 1
end

it "sends keep-alive pings periodically even when inventory is empty" do
q = StreamingPullStub::RaisableEnumeratorQueue.new
stub = StreamingPullStub.new [[]]
def stub.streaming_pull_internal req, opt = nil
@requests << req
@my_q.each
end
stub.instance_variable_set(:@my_q, q)
subscriber.service.mocked_subscription_admin = stub

listener = subscriber.listen streams: 1 do |msg|
end
listener.start

pong_thread = Thread.new do
10.times do
sleep 0.02
q.push Google::Cloud::PubSub::V1::StreamingPullResponse.new(received_messages: [])
end
end

sleep 0.18
pong_thread.join

listener.stop
listener.wait!

reqs = stub.requests.first.to_a
_(reqs.count).must_be :>=, 2
end

it "restarts stream when keep-alive pong deadline is exceeded" do
pull_res2 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
stub = StreamingPullStub.new [[], [pull_res2]]
subscriber.service.mocked_subscription_admin = stub

called = false
listener = subscriber.listen streams: 1 do |msg|
called = true
end
listener.start

listener_retries = 0
until called
fail "stream did not restart and deliver message" if listener_retries > 200
listener_retries += 1
sleep 0.01
end

listener.stop
listener.wait!

_(stub.requests.count).must_equal 2
end

it "does not restart stream when actively receiving keep-alive pongs" do
q = StreamingPullStub::RaisableEnumeratorQueue.new
stub = StreamingPullStub.new [[]]
def stub.streaming_pull_internal req, opt = nil
@requests << req
@my_q.each
end
stub.instance_variable_set(:@my_q, q)
subscriber.service.mocked_subscription_admin = stub

listener = subscriber.listen streams: 1 do |msg|
end
listener.start

pong_sender = Thread.new do
8.times do
sleep 0.02
empty_pong = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: []
q.push empty_pong
end
end

sleep 0.15
pong_sender.join

listener.stop
listener.wait!

_(stub.requests.count).must_equal 1
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down Expand Up @@ -126,7 +127,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down Expand Up @@ -126,7 +127,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down
Loading