Skip to content

feat(pubsub): implement streaming keep-alive logic#34653

Open
torreypayne wants to merge 1 commit into
mainfrom
pubsub-streaming-keepalive
Open

feat(pubsub): implement streaming keep-alive logic#34653
torreypayne wants to merge 1 commit into
mainfrom
pubsub-streaming-keepalive

Conversation

@torreypayne

Copy link
Copy Markdown
Member

Overview

Implements proactive streaming keep-alive logic and connection health monitoring in Google::Cloud::PubSub::MessageListener::Stream, mirroring the design implemented in the .NET Pub/Sub client (dotnet#15649).

Long-running bi-directional gRPC streaming pull connections (StreamingPull) can experience silent TCP drops, intermediary network timeouts, or read deadlocks during periods of low message volume. This change introduces background timer tasks to push regular keep-alive requests and actively monitor server Pong timestamps.

Key Changes

  • Protocol Version Initialization: Explicitly initializes protocol_version = 1 on the initial StreamingPullRequest protobuf to enable bi-directional stream keep-alive support.
  • Unconditional Keep-Alive Pings: Configures a background timer task (@stream_keepalive_task) to dispatch empty StreamingPullRequest pings at regular intervals (default 30 seconds), regardless of current lease inventory volume.
  • Pong Monitoring & Automatic Reconnection: Introduces @pong_monitor_task to inspect timestamps (@last_ping_at, @last_pong_at). If a keep-alive response is overdue by more than pong_deadline seconds (default 15 seconds), the monitor raises RestartStream to safely recycle the connection and back off.
  • Concurrency Timestamp Guard: Guards ping timestamp updates (@last_ping_at = now if @last_pong_at >= @last_ping_at) to ensure consecutive un-ponged pings cannot overwrite the timestamp of an overdue request.

Testing & Validation

  • Unit Test Suite (keepalive_test.rb): Added targeted unit test coverage asserting protocol version flags, timer intervals, deadline timeouts, and non-disruptive Pong handling.
  • Resiliency & Robustness Suite: Validated against live GCP test instances (helical-zone-771) across simulated TCP socket hangs, sub-millisecond deadline starvation, and post-recovery downstream message delivery.

@torreypayne torreypayne force-pushed the pubsub-streaming-keepalive branch 2 times, most recently from cf5df9b to b1acc8a Compare June 23, 2026 04:52
@torreypayne torreypayne marked this pull request as ready for review June 23, 2026 15:54
@torreypayne torreypayne requested review from a team and yoshi-approver as code owners June 23, 2026 15:54
@torreypayne torreypayne force-pushed the pubsub-streaming-keepalive branch from b1acc8a to 94e9f14 Compare June 23, 2026 16:53
@torreypayne torreypayne force-pushed the pubsub-streaming-keepalive branch from 94e9f14 to ad59cd2 Compare June 23, 2026 17:29

@robertvoinescu-work robertvoinescu-work left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM from a functional perspective. Just a few minor comments.

# Reset backoff delay only after successfully reading a frame from enum.next.
# If the connection drops immediately upon reading, @reconnect_delay is preserved.
@reconnect_delay = nil
end
new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add logic here to avoid setting fields off a pong response (an empty array of messages). The subscription properties on pongs are not valid.

execution_interval: @keepalive_interval
) do
synchronize do
if @stream_opened && !@stopped && @request_queue

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the request_queue contains the ACK responses we send back to Pub/Sub. I believe we should send keep-alive pings regardless of whether there are items in this queue. Currently, in the case of a slow message stream (e.g., receiving a message only every 31 seconds), the service will close the stream due to inactivity. This would force us to establish a new streaming connection every time we want to read.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants