Skip to content

[improve][cli] Add client side looping in "pulsar-admin topics analyze-backlog" cli to avoid potential HTTP call timeout#25126

Open
oneby-wang wants to merge 9 commits intoapache:masterfrom
oneby-wang:pulsar_cli_client_side_analyze_backlog
Open

[improve][cli] Add client side looping in "pulsar-admin topics analyze-backlog" cli to avoid potential HTTP call timeout#25126
oneby-wang wants to merge 9 commits intoapache:masterfrom
oneby-wang:pulsar_cli_client_side_analyze_backlog

Conversation

@oneby-wang
Copy link
Contributor

Fixes #25083

Motivation

Use client-side looping instead of increasing broker settings to avoid potential HTTP call timeout in "pulsar-admin topics analyze-backlog" cli.

Modifications

Add client-side looping, add test.

Verifying this change

  • Make sure that the change passes the CI checks.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: oneby-wang#21

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jan 6, 2026
mergedResult.setLastMessageId(currentResult.getLastMessageId());
}

if (!mergedResult.isAborted() || mergedResult.getEntries() >= backlogScanMaxEntries) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the !mergedResult.isAborted() part isn't correct.

if (remainingEntries.get() <= 0) {
log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
return;
}
if (System.currentTimeMillis() - startTime > timeOutMs) {
log.warn("[{}] Scan abort after hitting the deadline", OpScan.this.cursor);
callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
return;
}

The purpose of the CLI loop is to keep on iterating until the result is COMPLETED or the number of scanned entries exceeds backlogScanMaxEntries.

Comment on lines +3076 to +3079
String[] messageIdSplits = mergedResult.getLastMessageId().split(":");
MessageIdImpl nextScanMessageId =
new MessageIdImpl(Long.parseLong(messageIdSplits[0]), Long.parseLong(messageIdSplits[1]) + 1,
partitionIndex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of parsing the message Id String, it would be better to use org.apache.pulsar.client.api.MessageIdAdv interface to get the ledgerId and entryId.

Copy link
Contributor Author

@oneby-wang oneby-wang Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you provide me more information, I couldn't find one method that could parse ledgerId:entryId string to a MessageIdAdv instance.

Comment on lines +3072 to +3073
print("Analyze backlog progress, scanned entries: " + mergedResult.getEntries()
+ ", scan max entries: " + backlogScanMaxEntries);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful to print the current value of mergedResult in json format without linefeeds so that the CLI output can be parsed as NDJSON.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two ObjectMapper in CliCommand, so we should use the one without pretty printer to print json without linefeeds, and then pass the json string to print() method. Is my understanding correct?

private static final ObjectMapper MAPPER = ObjectMapperFactory.create();
private static final ObjectWriter WRITER = MAPPER.writerWithDefaultPrettyPrinter();

<T> void print(T item) {
try {
if (item instanceof String) {
commandSpec.commandLine().getOut().println(item);
} else {
prettyPrint(item);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

partitionIndex);
startPosition = Optional.of(nextScanMessageId);
}
print(mergedResult);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output would have to be json format without linefeeds to make the CLI output parseable as NDJSON. The last line in the output would be the final result. Perhaps there could be a command line option to configure whether ndjson should be used since it's not pretty printed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps there could be a command line option to configure whether ndjson should be used since it's not pretty printed.

A little bit confused. Does this option only take effect on the final result, or does it also apply to the intermediate results?

}

@Test
public void topicsAnalyzeBacklog() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test without mocks, similar to the tests in #25127 would be useful in addition since it would serve as an integration test.

Copy link
Contributor Author

@oneby-wang oneby-wang Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't found similar examples for reference. Could you provide me some examples.

I don't know how to invoke admin CLI in integration tests and how to receive the outputs(since they are printed to the console output).

Copy link
Contributor Author

@oneby-wang oneby-wang Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we need to handle the test in integration module like CLITest?

https://github.com/apache/pulsar/blob/85625e0f100479dd95fb1311aeb52411b6b0a25d/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java

Is there a better approach? Running the tests in integration module requires local image building.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we need to handle the test in integration module like CLITest?

https://github.com/apache/pulsar/blob/85625e0f100479dd95fb1311aeb52411b6b0a25d/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java

Is there a better approach? Running the tests in integration module requires local image building.

Yes it's a bit clumbersome.

To build an image quickly, use this script:

./build/build_java_test_image.sh

That requires setting export PULSAR_TEST_IMAGE_NAME=apachepulsar/java-test-image:latest environment variable so that the apachepulsar/java-test-image:latest docker image is used instead of the default apachepulsar/pulsar-test-latest-version:latest which takes much longer to build.
If you are running the test in IntelliJ, you can edit the default value at

For running integration tests on command line, this is the sequence I have used:

# compile integration test dependencies
mvn -am -pl tests/integration -Dcheckstyle.skip=true -Dlicense.skip=true -Dspotbugs.skip=true -DskipTests install
./build/build_java_test_image.sh
export PULSAR_TEST_IMAGE_NAME=apachepulsar/java-test-image:latest
# run the test
mvn -DintegrationTests -pl tests/integration -DtestRetryCount=0 -DredirectTestOutputToFile=false test \
-Dtest=TestClassNameHere

If you are working on the test alone, you obviously don't have to rebuild the image after each change.

@oneby-wang
Copy link
Contributor Author

oneby-wang commented Feb 5, 2026

@lhotari Could help me solve the above questions when you have a moment? Especially about how to write integration tests easily in admin CLI module.

I'll refactor this PR using the API that PR #25127 provided once I'm back.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

2 participants