Skip to content

feat(taskbroker): Implement retry support for raw topics#630

Draft
untitaker wants to merge 2 commits into
mainfrom
feat/retry-support-raw-topics
Draft

feat(taskbroker): Implement retry support for raw topics#630
untitaker wants to merge 2 commits into
mainfrom
feat/retry-support-raw-topics

Conversation

@untitaker
Copy link
Copy Markdown
Member

@untitaker untitaker commented May 11, 2026

Summary

Implement two independent features in this PR:

  • kafka_retry_topic so that one can send activations from retried tasks into a separate topic. That topic never contains raw messages.
  • Ability to get max_retries from the worker. Previously the producer would configure max_retries and send them as part of the activation, but that cannot work with raw topics, so an architecture change is needed.

See Architecture doc → Stage 4 for full context.

Dependencies

Depends on: getsentry/sentry-protos#251 (adds max_retries field to SetTaskStatusRequest)

ref STREAM-981

Add retry support for raw/passthrough topics (e.g. `ingest-events`) where
tasks don't have retry_state embedded in the message.

Changes:
- Config: Add `kafka_retry_topic` option for dedicated retry topic
- Store: Add `update_retry_state` method to update activation's retry_state
- gRPC: Handle `max_retries` in SetTaskStatus, call store.update_retry_state
- Upkeep: Route retries to dedicated retry topic when configured
- Consumer: Subscribe to both main and retry topics
- Deserializer: Topic-aware routing (retry topic always uses activation deserializer)
- Python client: Extract max_retries from Retry config, send in SetTaskStatusRequest

When a worker reports RETRY status with max_retries, the broker updates
the activation's retry_state and routes the retry to the dedicated retry
topic. This prevents retries from polluting the main topic where other
consumers (like SBC) can't parse activations.

See https://www.notion.so/3448b10e4b5d80e7a1efee6145d504c2 → Stage 4

Depends on: getsentry/sentry-protos#251

ref STREAM-981

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@linear-code
Copy link
Copy Markdown

linear-code Bot commented May 11, 2026

STREAM-981

max_retries (from Python's @task decorator) excludes the initial attempt,
while max_attempts includes it. Add 1 when storing to retry_state.

Example: @task(max_retries=3) means 4 total attempts (1 initial + 3 retries)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Comment thread src/grpc/server.rs
// This allows workers to communicate retry policy for tasks from raw topics.
if status == InflightActivationStatus::Retry {
if let Some(max_retries) = request.get_ref().max_retries {
if let Err(e) = self.store.update_retry_state(&id, max_retries).await {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How often will this happen? If it happens on every request, throughput will drop significantly. If it happens only occasionally, it should be fine.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on every retry, we need to update the task activation. but i think i'll redesign this so that the task activation is updated in one go (max_retries and retry_count are updated using the same update stmt)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants