feat(pubsub): implement streaming keep-alive logic#34653
Conversation
cf5df9b to
b1acc8a
Compare
b1acc8a to
94e9f14
Compare
94e9f14 to
ad59cd2
Compare
robertvoinescu-work
left a comment
There was a problem hiding this comment.
LGTM from a functional perspective. Just a few minor comments.
| # Reset backoff delay only after successfully reading a frame from enum.next. | ||
| # If the connection drops immediately upon reading, @reconnect_delay is preserved. | ||
| @reconnect_delay = nil | ||
| end | ||
| new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled |
There was a problem hiding this comment.
You should add logic here to avoid setting fields off a pong response (an empty array of messages). The subscription properties on pongs are not valid.
| execution_interval: @keepalive_interval | ||
| ) do | ||
| synchronize do | ||
| if @stream_opened && !@stopped && @request_queue |
There was a problem hiding this comment.
My understanding is that the request_queue contains the ACK responses we send back to Pub/Sub. I believe we should send keep-alive pings regardless of whether there are items in this queue. Currently, in the case of a slow message stream (e.g., receiving a message only every 31 seconds), the service will close the stream due to inactivity. This would force us to establish a new streaming connection every time we want to read.
Overview
Implements proactive streaming keep-alive logic and connection health monitoring in
Google::Cloud::PubSub::MessageListener::Stream, mirroring the design implemented in the .NET Pub/Sub client (dotnet#15649).Long-running bi-directional gRPC streaming pull connections (
StreamingPull) can experience silent TCP drops, intermediary network timeouts, or read deadlocks during periods of low message volume. This change introduces background timer tasks to push regular keep-alive requests and actively monitor server Pong timestamps.Key Changes
protocol_version = 1on the initialStreamingPullRequestprotobuf to enable bi-directional stream keep-alive support.@stream_keepalive_task) to dispatch emptyStreamingPullRequestpings at regular intervals (default 30 seconds), regardless of current lease inventory volume.@pong_monitor_taskto inspect timestamps (@last_ping_at,@last_pong_at). If a keep-alive response is overdue by more thanpong_deadlineseconds (default 15 seconds), the monitor raisesRestartStreamto safely recycle the connection and back off.@last_ping_at = now if @last_pong_at >= @last_ping_at) to ensure consecutive un-ponged pings cannot overwrite the timestamp of an overdue request.Testing & Validation
keepalive_test.rb): Added targeted unit test coverage asserting protocol version flags, timer intervals, deadline timeouts, and non-disruptive Pong handling.helical-zone-771) across simulated TCP socket hangs, sub-millisecond deadline starvation, and post-recovery downstream message delivery.