Skip to content

[feat][broker] PIP-468: scalable-topic seek + clear-backlog admin API#25696

Open
merlimat wants to merge 1 commit intoapache:masterfrom
merlimat:st-seek-clear-backlog
Open

[feat][broker] PIP-468: scalable-topic seek + clear-backlog admin API#25696
merlimat wants to merge 1 commit intoapache:masterfrom
merlimat:st-seek-clear-backlog

Conversation

@merlimat
Copy link
Copy Markdown
Contributor

@merlimat merlimat commented May 6, 2026

Summary

Two new operational primitives on a scalable-topic subscription, exposed as admin REST endpoints, admin client methods, and pulsar-admin CLI subcommands.

seek by wall-clock timestamp

Reset every per-segment cursor to a point in time. The controller uses each segment's recorded [createdAtMs, sealedAtMs) window to dispatch the cheapest per-segment op:

Segment relative to t Per-segment op
Sealed entirely before t skip-all (cursor → end)
Created entirely after t seek to timestamp=0 (earliest)
Alive at t seek to timestamp=t

clear-backlog

Dispatch skip-all on the subscription across every segment in the DAG.

Plumbing

  • Per-segment endpoints under /segments/.../subscription/{sub}/seek and .../skip-all (super-user, routed to segment owner). Call Subscription.resetCursor / clearBacklog under the hood.
  • Parent-topic endpoints under /scalable/.../subscriptions/{sub}/seek and .../skip-all, gated on RESET_CURSOR / SKIP authz, routed to the controller leader.
  • Admin client interface + impl pairs (sync/async): segment-level (seekSegmentSubscription, clearSegmentSubscriptionBacklog) and parent-level (seekSubscription, clearBacklog).
  • CLI:
    • pulsar-admin scalable-topics seek <topic> --subscription <s> --time 1h--time is a relative offset; the absolute timestamp passed to the broker is now - offset. Standard time-unit converter (1s, 5m, 1h, 5d, …).
    • pulsar-admin scalable-topics clear-backlog <topic> --subscription <s>.

Removals

Subscription seek is now an admin operation, not a consumer operation. The following V5 client surface goes away:

  • StreamConsumerBuilder.seek(MessageId) and seek(Instant) — were placeholder no-ops. Initial position is set via subscriptionInitialPosition(EARLIEST/LATEST); timestamp seek is the new admin call.
  • CheckpointConsumer.seek(Checkpoint) and the async counterpart. Connector frameworks restore from a saved checkpoint via CheckpointConsumerBuilder.startPosition(Checkpoint).
  • Checkpoint.atTimestamp(Instant) factory and the underlying TimestampCheckpoint type — timestamp positioning is the admin surface, not a checkpoint kind.
  • Checkpoint.creationTime() — was just metadata, not part of the position vector. Wire format simplifies accordingly. Connector frameworks that need timing can record it themselves.

Test plan

  • ScalableTopicControllerTest:
    • testSeekSubscriptionDispatchesPerSegmentByTimestamp — three segments at hand-picked timestamps (one fully before t, one straddling, one fully after); asserts the right per-segment admin call is issued for each.
    • testClearBacklogDispatchesSkipAllToEverySegment — N skip-all calls for N segments.
  • V5 checkpoint suites updated and green: V5CheckpointConsumerBasicTest, V5CheckpointConsumerDagReplayTest, V5CheckpointConsumerGroupTest, V5AsyncApisTest, CheckpointV5Test.
  • Checkstyle clean (pulsar-broker, pulsar-client-admin-api, pulsar-client-admin, pulsar-client-api-v5, pulsar-client-v5, pulsar-client-tools).

Two new operational primitives on a scalable-topic subscription, exposed
as admin REST endpoints, admin client methods, and pulsar-admin CLI
subcommands:

- seek by wall-clock timestamp: reset every per-segment cursor to a
  point in time. The controller uses each segment's recorded
  [createdAtMs, sealedAtMs) window to dispatch the cheapest per-segment
  op:
    - sealed entirely before t  -> skip-all (cursor to end)
    - created entirely after t  -> seek to timestamp 0 (earliest)
    - alive at t                -> seek to timestamp t
- clear backlog: dispatch skip-all on the subscription across every
  segment in the DAG.

Plumbing:
  - Per-segment endpoints in /segments/.../subscription/{sub}/seek and
    .../skip-all (super-user, route to segment owner). They call the
    standard Subscription.resetCursor / clearBacklog under the hood.
  - Parent-topic endpoints in /scalable/.../subscriptions/{sub}/seek
    and .../skip-all, gated on RESET_CURSOR / SKIP authz, routed to
    the controller leader.
  - Admin client interface + impl pairs (seekSegmentSubscription /
    clearSegmentSubscriptionBacklog and seekSubscription / clearBacklog).
  - CLI: `pulsar-admin scalable-topics seek <topic> --subscription <s>
    --time 1h` (relative offset, computed as now - offset) and
    `pulsar-admin scalable-topics clear-backlog <topic> -s <s>`.

Removals (subscription seek is now an admin operation, not a consumer
operation):
  - StreamConsumerBuilder.seek(MessageId / Instant) — were placeholder
    no-ops; gone. Initial position uses subscriptionInitialPosition;
    timestamp seek uses the new admin API.
  - CheckpointConsumer.seek(Checkpoint) and the async counterpart.
    Frameworks restore from a saved checkpoint via
    CheckpointConsumerBuilder.startPosition(Checkpoint).
  - Checkpoint.atTimestamp(Instant) factory and the underlying
    TimestampCheckpoint type — timestamp positioning is the admin
    surface, not a checkpoint kind.
  - Checkpoint.creationTime() — was just metadata, not part of the
    position vector. Connector frameworks that need timing record it
    themselves. Wire format simplifies accordingly.

Tests:
  - ScalableTopicControllerTest:
      testSeekSubscriptionDispatchesPerSegmentByTimestamp — three
      segments at hand-picked timestamps (one fully before t, one
      straddling, one fully after); asserts the right per-segment
      admin call is issued for each.
      testClearBacklogDispatchesSkipAllToEverySegment — N skip-all
      calls for N segments.
  - V5CheckpointConsumerBasicTest.testSeekRewindsToEarlierCheckpoint —
    removed; corresponding example in Examples.java removed.
  - V5AsyncApisTest.testAsyncCheckpointConsumerCheckpointAndSeek —
    slimmed to testAsyncCheckpointConsumerCheckpoint.
  - CheckpointV5Test — drop timestamp-roundtrip + creationTime
    assertions; constructor calls updated to single-arg.
@lhotari lhotari changed the title PIP-468: scalable-topic seek + clear-backlog admin API [feat][broker] PIP-468: scalable-topic seek + clear-backlog admin API May 6, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant