Skip to content

pulsar: make max-message-bytes configurable#5055

Open
HGHNice wants to merge 2 commits into
pingcap:masterfrom
HGHNice:pulsar/configurable-max-message-bytes
Open

pulsar: make max-message-bytes configurable#5055
HGHNice wants to merge 2 commits into
pingcap:masterfrom
HGHNice:pulsar/configurable-max-message-bytes

Conversation

@HGHNice
Copy link
Copy Markdown

@HGHNice HGHNice commented May 15, 2026

What problem does this PR solve?

Issue Number: close #5058

The Pulsar sink hardcodes max-message-bytes to DefaultMaxMessageBytes
(10MB), while the Kafka sink allows this to be tuned via the sink URI.
Users with different Pulsar broker configurations have no way to adjust
the message size limit.

What is changed and how it works?

  • Add MaxMessageBytes *int field to PulsarConfig in pkg/config/sink.go.
  • Parse max-message-bytes from the sink URI in pkg/sink/pulsar/config.go,
    defaulting to 10MB when not set.
  • Pass the configured value to GetEncoderConfig in
    downstreamadapter/sink/pulsar/helper.go instead of the hardcoded constant.

Usage: pulsar://host/topic?max-message-bytes=5242880

Check List

Tests

  • Unit test

Questions

Will it cause performance regression or break compatibility?

No. The default value is unchanged (10MB). This only adds a new optional
URI parameter.

Do you need to update user documentation, design documentation or monitoring documentation?

The Pulsar sink URI parameter documentation should list max-message-bytes.

Release note

The Pulsar sink now supports the `max-message-bytes` URI parameter to
configure the maximum message size (default: 10MB), matching the existing
Kafka sink behavior.

Summary by CodeRabbit

  • New Features
    • Added configurable maximum message size for Pulsar sinks. Users can now set max-message-bytes via TOML, JSON, or URI parameters with a default of 10MB, providing consistent behavior with Kafka sink configuration.

Review Change Stack

Add MaxMessageBytes field to PulsarConfig so users can tune the maximum
message size via the sink URI (e.g. ?max-message-bytes=5242880), matching
the behavior already available for the Kafka sink. Defaults to 10MB.
@ti-chi-bot ti-chi-bot Bot added do-not-merge/needs-linked-issue do-not-merge/release-note-label-needed Indicates that a PR should not merge because it's missing one of the release note labels. labels May 15, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented May 15, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign flowbehappy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 15, 2026

📝 Walkthrough

Walkthrough

This PR enables the Pulsar sink to accept a max-message-bytes URI parameter, allowing users to configure the maximum message size to match their broker settings. The implementation adds a configuration field, parses the parameter from the URI with validation, and uses the configured value in encoder setup.

Changes

Make Pulsar max-message-bytes configurable

Layer / File(s) Summary
PulsarConfig schema definition
pkg/config/sink.go
Add MaxMessageBytes *int field to PulsarConfig struct with TOML (max-message-bytes) and JSON tags, documented as the maximum single-message size (10MB default).
URI parameter parsing and configuration initialization
pkg/sink/pulsar/config.go
Add strconv import, parse and validate the max-message-bytes query parameter in NewPulsarConfig, populate MaxMessageBytes during config initialization, and update merge logic so URI-provided values take precedence with fallback to TOML configuration.
Encoder integration
downstreamadapter/sink/pulsar/helper.go
Use the configured MaxMessageBytes value from pulsarComponent.config when setting up the encoder, defaulting to config.DefaultMaxMessageBytes only when the config value is unset.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

Poem

A rabbit hops through config files,
Parsing bytes with careful wiles,
"Ten megabytes, or more, or less?"
The Pulsar sink now passes this test! 🐰✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title directly describes the main change: making the max-message-bytes parameter configurable for the Pulsar sink.
Description check ✅ Passed The description fully covers the template requirements with issue reference, clear explanation of changes, test confirmation, compatibility assessment, and release notes.
Linked Issues check ✅ Passed The PR implementation directly addresses all requirements from issue #5058: adds MaxMessageBytes field, parses max-message-bytes URI parameter, and passes configured value to GetEncoderConfig.
Out of Scope Changes check ✅ Passed All changes are tightly scoped to the stated objectives: adding configurable max-message-bytes support to the Pulsar sink with no extraneous modifications.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot Bot added contribution This PR is from a community contributor. first-time-contributor Indicates that the PR was contributed by an external member and is a first-time contributor. needs-ok-to-test Indicates a PR created by contributors and need ORG member send '/ok-to-test' to start testing. labels May 15, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented May 15, 2026

Hi @HGHNice. Thanks for your PR.

I'm waiting for a pingcap member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@pingcap-cla-assistant
Copy link
Copy Markdown

pingcap-cla-assistant Bot commented May 15, 2026

CLA assistant check
All committers have signed the CLA.

@ti-chi-bot ti-chi-bot Bot added the size/S Denotes a PR that changes 10-29 lines, ignoring generated files. label May 15, 2026
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a configurable MaxMessageBytes parameter for the Pulsar sink, allowing users to specify the maximum message size via the sink URI or the configuration file. The review feedback highlights that this parameter should also be applied to the Pulsar producer configuration to avoid potential "message too large" errors. Additionally, there are suggestions to simplify the logic for assigning default values and to ensure that URI parameters correctly override TOML configurations to maintain consistency with other TiCDC sinks.

Comment thread downstreamadapter/sink/pulsar/helper.go
Comment thread downstreamadapter/sink/pulsar/helper.go
Comment thread pkg/sink/pulsar/config.go Outdated
Comment on lines +124 to +126
if pulsarConfig.MaxMessageBytes == nil {
pulsarConfig.MaxMessageBytes = c.MaxMessageBytes
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In TiCDC, parameters provided in the sink URI typically take precedence over those in the configuration file (TOML). The current implementation gives precedence to the TOML configuration because it only applies the value from c (which contains the URI parameter) if pulsarConfig.MaxMessageBytes is nil. Consider reversing this logic to ensure URI parameters can override TOML settings, or explicitly document this behavior if it is intentional for the Pulsar sink.

Fix URI parameter precedence: URI max-message-bytes now correctly
overrides TOML configuration, consistent with other TiCDC sinks.
@ti-chi-bot ti-chi-bot Bot added size/M Denotes a PR that changes 30-99 lines, ignoring generated files. release-note Denotes a PR that will be considered when it comes time to generate release notes. and removed do-not-merge/needs-triage-completed size/S Denotes a PR that changes 10-29 lines, ignoring generated files. do-not-merge/release-note-label-needed Indicates that a PR should not merge because it's missing one of the release note labels. labels May 21, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
pkg/sink/pulsar/config.go (1)

57-82: ⚡ Quick win

URI parameter parsing validates correctly, but consider adding validation for TOML values.

The URI parameter validation (positive integer check at lines 64-66) is good. However, if a user sets an invalid value in TOML (e.g., max-message-bytes = 0 or negative), it won't be caught since PulsarConfig.validate() doesn't check MaxMessageBytes.

Consider adding validation in PulsarConfig.validate() (line 682) to ensure consistency:

🛡️ Suggested validation enhancement

In pkg/config/sink.go, enhance the validate() method:

 // Check get broker url
 func (c *PulsarConfig) validate() (err error) {
+	if c.MaxMessageBytes != nil && *c.MaxMessageBytes <= 0 {
+		return fmt.Errorf("max-message-bytes must be positive, got %d", *c.MaxMessageBytes)
+	}
 	if c.OAuth2 != nil {
 		if err = c.OAuth2.validate(); err != nil {
 			return err
 		}
 	}
 	return nil
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/sink/pulsar/config.go` around lines 57 - 82, Add validation in
PulsarConfig.validate() to mirror the URI parsing rules: check the
MaxMessageBytes field (config.PulsarConfig.MaxMessageBytes) and if it's non-nil
ensure the pointed value is > 0, returning a descriptive error if it's zero or
negative; keep the existing behavior for nil (meaning use defaults) so
TOML-specified invalid values like 0 or negative are caught the same way as URI
parameters.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@pkg/sink/pulsar/config.go`:
- Around line 57-82: Add validation in PulsarConfig.validate() to mirror the URI
parsing rules: check the MaxMessageBytes field
(config.PulsarConfig.MaxMessageBytes) and if it's non-nil ensure the pointed
value is > 0, returning a descriptive error if it's zero or negative; keep the
existing behavior for nil (meaning use defaults) so TOML-specified invalid
values like 0 or negative are caught the same way as URI parameters.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9765a104-9f9b-4ae9-a40c-e597fac4557c

📥 Commits

Reviewing files that changed from the base of the PR and between 7323df2 and d420d85.

📒 Files selected for processing (3)
  • downstreamadapter/sink/pulsar/helper.go
  • pkg/config/sink.go
  • pkg/sink/pulsar/config.go

@wk989898 wk989898 removed the needs-ok-to-test Indicates a PR created by contributors and need ORG member send '/ok-to-test' to start testing. label May 29, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

contribution This PR is from a community contributor. first-time-contributor Indicates that the PR was contributed by an external member and is a first-time contributor. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/M Denotes a PR that changes 30-99 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

pulsar: make max-message-bytes configurable

2 participants