pulsar: make max-message-bytes configurable#5055
Conversation
Add MaxMessageBytes field to PulsarConfig so users can tune the maximum message size via the sink URI (e.g. ?max-message-bytes=5242880), matching the behavior already available for the Kafka sink. Defaults to 10MB.
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThis PR enables the Pulsar sink to accept a ChangesMake Pulsar max-message-bytes configurable
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Hi @HGHNice. Thanks for your PR. I'm waiting for a pingcap member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
There was a problem hiding this comment.
Code Review
This pull request introduces a configurable MaxMessageBytes parameter for the Pulsar sink, allowing users to specify the maximum message size via the sink URI or the configuration file. The review feedback highlights that this parameter should also be applied to the Pulsar producer configuration to avoid potential "message too large" errors. Additionally, there are suggestions to simplify the logic for assigning default values and to ensure that URI parameters correctly override TOML configurations to maintain consistency with other TiCDC sinks.
| if pulsarConfig.MaxMessageBytes == nil { | ||
| pulsarConfig.MaxMessageBytes = c.MaxMessageBytes | ||
| } |
There was a problem hiding this comment.
In TiCDC, parameters provided in the sink URI typically take precedence over those in the configuration file (TOML). The current implementation gives precedence to the TOML configuration because it only applies the value from c (which contains the URI parameter) if pulsarConfig.MaxMessageBytes is nil. Consider reversing this logic to ensure URI parameters can override TOML settings, or explicitly document this behavior if it is intentional for the Pulsar sink.
Fix URI parameter precedence: URI max-message-bytes now correctly overrides TOML configuration, consistent with other TiCDC sinks.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
pkg/sink/pulsar/config.go (1)
57-82: ⚡ Quick winURI parameter parsing validates correctly, but consider adding validation for TOML values.
The URI parameter validation (positive integer check at lines 64-66) is good. However, if a user sets an invalid value in TOML (e.g.,
max-message-bytes = 0or negative), it won't be caught sincePulsarConfig.validate()doesn't checkMaxMessageBytes.Consider adding validation in
PulsarConfig.validate()(line 682) to ensure consistency:🛡️ Suggested validation enhancement
In
pkg/config/sink.go, enhance thevalidate()method:// Check get broker url func (c *PulsarConfig) validate() (err error) { + if c.MaxMessageBytes != nil && *c.MaxMessageBytes <= 0 { + return fmt.Errorf("max-message-bytes must be positive, got %d", *c.MaxMessageBytes) + } if c.OAuth2 != nil { if err = c.OAuth2.validate(); err != nil { return err } } return nil }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/sink/pulsar/config.go` around lines 57 - 82, Add validation in PulsarConfig.validate() to mirror the URI parsing rules: check the MaxMessageBytes field (config.PulsarConfig.MaxMessageBytes) and if it's non-nil ensure the pointed value is > 0, returning a descriptive error if it's zero or negative; keep the existing behavior for nil (meaning use defaults) so TOML-specified invalid values like 0 or negative are caught the same way as URI parameters.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@pkg/sink/pulsar/config.go`:
- Around line 57-82: Add validation in PulsarConfig.validate() to mirror the URI
parsing rules: check the MaxMessageBytes field
(config.PulsarConfig.MaxMessageBytes) and if it's non-nil ensure the pointed
value is > 0, returning a descriptive error if it's zero or negative; keep the
existing behavior for nil (meaning use defaults) so TOML-specified invalid
values like 0 or negative are caught the same way as URI parameters.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9765a104-9f9b-4ae9-a40c-e597fac4557c
📒 Files selected for processing (3)
downstreamadapter/sink/pulsar/helper.gopkg/config/sink.gopkg/sink/pulsar/config.go
What problem does this PR solve?
Issue Number: close #5058
The Pulsar sink hardcodes
max-message-bytestoDefaultMaxMessageBytes(10MB), while the Kafka sink allows this to be tuned via the sink URI.
Users with different Pulsar broker configurations have no way to adjust
the message size limit.
What is changed and how it works?
MaxMessageBytes *intfield toPulsarConfiginpkg/config/sink.go.max-message-bytesfrom the sink URI inpkg/sink/pulsar/config.go,defaulting to 10MB when not set.
GetEncoderConfigindownstreamadapter/sink/pulsar/helper.goinstead of the hardcoded constant.Usage:
pulsar://host/topic?max-message-bytes=5242880Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No. The default value is unchanged (10MB). This only adds a new optional
URI parameter.
Do you need to update user documentation, design documentation or monitoring documentation?
The Pulsar sink URI parameter documentation should list
max-message-bytes.Release note
Summary by CodeRabbit
max-message-bytesvia TOML, JSON, or URI parameters with a default of 10MB, providing consistent behavior with Kafka sink configuration.