[feat][broker] PIP-468: scalable-topic seek + clear-backlog admin API#25696
Open
merlimat wants to merge 1 commit intoapache:masterfrom
Open
[feat][broker] PIP-468: scalable-topic seek + clear-backlog admin API#25696merlimat wants to merge 1 commit intoapache:masterfrom
merlimat wants to merge 1 commit intoapache:masterfrom
Conversation
Two new operational primitives on a scalable-topic subscription, exposed
as admin REST endpoints, admin client methods, and pulsar-admin CLI
subcommands:
- seek by wall-clock timestamp: reset every per-segment cursor to a
point in time. The controller uses each segment's recorded
[createdAtMs, sealedAtMs) window to dispatch the cheapest per-segment
op:
- sealed entirely before t -> skip-all (cursor to end)
- created entirely after t -> seek to timestamp 0 (earliest)
- alive at t -> seek to timestamp t
- clear backlog: dispatch skip-all on the subscription across every
segment in the DAG.
Plumbing:
- Per-segment endpoints in /segments/.../subscription/{sub}/seek and
.../skip-all (super-user, route to segment owner). They call the
standard Subscription.resetCursor / clearBacklog under the hood.
- Parent-topic endpoints in /scalable/.../subscriptions/{sub}/seek
and .../skip-all, gated on RESET_CURSOR / SKIP authz, routed to
the controller leader.
- Admin client interface + impl pairs (seekSegmentSubscription /
clearSegmentSubscriptionBacklog and seekSubscription / clearBacklog).
- CLI: `pulsar-admin scalable-topics seek <topic> --subscription <s>
--time 1h` (relative offset, computed as now - offset) and
`pulsar-admin scalable-topics clear-backlog <topic> -s <s>`.
Removals (subscription seek is now an admin operation, not a consumer
operation):
- StreamConsumerBuilder.seek(MessageId / Instant) — were placeholder
no-ops; gone. Initial position uses subscriptionInitialPosition;
timestamp seek uses the new admin API.
- CheckpointConsumer.seek(Checkpoint) and the async counterpart.
Frameworks restore from a saved checkpoint via
CheckpointConsumerBuilder.startPosition(Checkpoint).
- Checkpoint.atTimestamp(Instant) factory and the underlying
TimestampCheckpoint type — timestamp positioning is the admin
surface, not a checkpoint kind.
- Checkpoint.creationTime() — was just metadata, not part of the
position vector. Connector frameworks that need timing record it
themselves. Wire format simplifies accordingly.
Tests:
- ScalableTopicControllerTest:
testSeekSubscriptionDispatchesPerSegmentByTimestamp — three
segments at hand-picked timestamps (one fully before t, one
straddling, one fully after); asserts the right per-segment
admin call is issued for each.
testClearBacklogDispatchesSkipAllToEverySegment — N skip-all
calls for N segments.
- V5CheckpointConsumerBasicTest.testSeekRewindsToEarlierCheckpoint —
removed; corresponding example in Examples.java removed.
- V5AsyncApisTest.testAsyncCheckpointConsumerCheckpointAndSeek —
slimmed to testAsyncCheckpointConsumerCheckpoint.
- CheckpointV5Test — drop timestamp-roundtrip + creationTime
assertions; constructor calls updated to single-arg.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Two new operational primitives on a scalable-topic subscription, exposed as admin REST endpoints, admin client methods, and
pulsar-adminCLI subcommands.seekby wall-clock timestampReset every per-segment cursor to a point in time. The controller uses each segment's recorded
[createdAtMs, sealedAtMs)window to dispatch the cheapest per-segment op:ttttimestamp=0(earliest)ttimestamp=tclear-backlogDispatch skip-all on the subscription across every segment in the DAG.
Plumbing
/segments/.../subscription/{sub}/seekand.../skip-all(super-user, routed to segment owner). CallSubscription.resetCursor/clearBacklogunder the hood./scalable/.../subscriptions/{sub}/seekand.../skip-all, gated onRESET_CURSOR/SKIPauthz, routed to the controller leader.seekSegmentSubscription,clearSegmentSubscriptionBacklog) and parent-level (seekSubscription,clearBacklog).pulsar-admin scalable-topics seek <topic> --subscription <s> --time 1h—--timeis a relative offset; the absolute timestamp passed to the broker isnow - offset. Standard time-unit converter (1s,5m,1h,5d, …).pulsar-admin scalable-topics clear-backlog <topic> --subscription <s>.Removals
Subscription seek is now an admin operation, not a consumer operation. The following V5 client surface goes away:
StreamConsumerBuilder.seek(MessageId)andseek(Instant)— were placeholder no-ops. Initial position is set viasubscriptionInitialPosition(EARLIEST/LATEST); timestamp seek is the new admin call.CheckpointConsumer.seek(Checkpoint)and the async counterpart. Connector frameworks restore from a saved checkpoint viaCheckpointConsumerBuilder.startPosition(Checkpoint).Checkpoint.atTimestamp(Instant)factory and the underlyingTimestampCheckpointtype — timestamp positioning is the admin surface, not a checkpoint kind.Checkpoint.creationTime()— was just metadata, not part of the position vector. Wire format simplifies accordingly. Connector frameworks that need timing can record it themselves.Test plan
ScalableTopicControllerTest:testSeekSubscriptionDispatchesPerSegmentByTimestamp— three segments at hand-picked timestamps (one fully beforet, one straddling, one fully after); asserts the right per-segment admin call is issued for each.testClearBacklogDispatchesSkipAllToEverySegment— N skip-all calls for N segments.V5CheckpointConsumerBasicTest,V5CheckpointConsumerDagReplayTest,V5CheckpointConsumerGroupTest,V5AsyncApisTest,CheckpointV5Test.pulsar-broker,pulsar-client-admin-api,pulsar-client-admin,pulsar-client-api-v5,pulsar-client-v5,pulsar-client-tools).