Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,29 @@ protected int getNumKafkaBrokers() {
return useKafkaTransaction() ? DEFAULT_TRANSACTION_NUM_KAFKA_BROKERS : DEFAULT_LLC_NUM_KAFKA_BROKERS;
}

/**
* Maximum number of attempts to start the Kafka cluster. Subclasses may override to use more
* attempts in resource-constrained environments (e.g. CI).
*/
protected int getKafkaStartMaxAttempts() {
return KAFKA_START_MAX_ATTEMPTS;
}

/**
* Wait time in ms between Kafka startup retry attempts. Subclasses may override for CI.
*/
protected long getKafkaStartRetryWaitMs() {
return KAFKA_START_RETRY_WAIT_MS;
}

/**
* Timeout in ms for Kafka cluster to become ready (brokers + transaction coordinator).
* Subclasses may override for resource-constrained environments (e.g. CI).
*/
protected long getKafkaClusterReadyTimeoutMs() {
return KAFKA_CLUSTER_READY_TIMEOUT_MS;
}

protected int getKafkaPort() {
int idx = RANDOM.nextInt(_kafkaStarters.size());
return _kafkaStarters.get(idx).getPort();
Expand Down Expand Up @@ -818,7 +841,7 @@ protected void startKafkaWithoutTopic() {
int requestedBrokers = getNumKafkaBrokers();
List<KafkaBrokerConfig> brokerConfigs = getOrCreateKafkaBrokerConfigs(requestedBrokers);
Throwable lastFailure = null;
for (int attempt = 1; attempt <= KAFKA_START_MAX_ATTEMPTS; attempt++) {
for (int attempt = 1; attempt <= getKafkaStartMaxAttempts(); attempt++) {
String clusterId = UUID.randomUUID().toString().replace("-", "");
String networkName = "pinot-it-kafka-" + UUID.randomUUID().toString().replace("-", "");
String quorumVoters = brokerConfigs.stream()
Expand All @@ -836,7 +859,7 @@ protected void startKafkaWithoutTopic() {
_kafkaStarters = kafkaStarters;
waitForKafkaClusterReady(getKafkaBrokerList(), requestedBrokers, useKafkaTransaction());
if (attempt > 1) {
LOGGER.info("Kafka startup succeeded on retry attempt {}/{}", attempt, KAFKA_START_MAX_ATTEMPTS);
LOGGER.info("Kafka startup succeeded on retry attempt {}/{}", attempt, getKafkaStartMaxAttempts());
}
return;
} catch (Throwable t) {
Expand All @@ -846,19 +869,19 @@ protected void startKafkaWithoutTopic() {

lastFailure = t;
LOGGER.warn("Kafka startup attempt {}/{} failed; stopping started brokers before retry", attempt,
KAFKA_START_MAX_ATTEMPTS, t);
getKafkaStartMaxAttempts(), t);
_kafkaStarters = kafkaStarters;
try {
stopKafka();
} catch (RuntimeException stopException) {
LOGGER.warn("Kafka cleanup failed after startup attempt {}/{}", attempt, KAFKA_START_MAX_ATTEMPTS,
LOGGER.warn("Kafka cleanup failed after startup attempt {}/{}", attempt, getKafkaStartMaxAttempts(),
stopException);
t.addSuppressed(stopException);
}

if (attempt < KAFKA_START_MAX_ATTEMPTS) {
if (attempt < getKafkaStartMaxAttempts()) {
try {
Thread.sleep(KAFKA_START_RETRY_WAIT_MS);
Thread.sleep(getKafkaStartRetryWaitMs());
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting to retry Kafka startup", interruptedException);
Expand All @@ -868,7 +891,7 @@ protected void startKafkaWithoutTopic() {
}

_kafkaBrokerConfigs = null;
throw new RuntimeException("Failed to start Kafka cluster after " + KAFKA_START_MAX_ATTEMPTS + " attempts",
throw new RuntimeException("Failed to start Kafka cluster after " + getKafkaStartMaxAttempts() + " attempts",
lastFailure);
}

Expand Down Expand Up @@ -921,11 +944,12 @@ private static int getAvailablePort() {
}

private void waitForKafkaClusterReady(String brokerList, int brokerCount, boolean requireTransactions) {
long clusterReadyTimeoutMs = getKafkaClusterReadyTimeoutMs();
TestUtils.waitForCondition(aVoid -> isKafkaClusterReady(brokerList, brokerCount), 200L,
KAFKA_CLUSTER_READY_TIMEOUT_MS,
clusterReadyTimeoutMs,
"Kafka brokers are not ready");
if (requireTransactions) {
TestUtils.waitForCondition(aVoid -> canInitTransactions(brokerList), 500L, KAFKA_CLUSTER_READY_TIMEOUT_MS,
TestUtils.waitForCondition(aVoid -> canInitTransactions(brokerList), 500L, clusterReadyTimeoutMs,
"Kafka transaction coordinator is not ready");
}
}
Expand Down
70 changes: 51 additions & 19 deletions pinot-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,57 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>false</skipTests>
<includes>
<include>**/org/apache/pinot/integration/tests/A*Test.java</include>
<include>**/org/apache/pinot/integration/tests/B*Test.java</include>
<include>**/org/apache/pinot/integration/tests/C*Test.java</include>
<include>**/org/apache/pinot/integration/tests/D*Test.java</include>
<include>**/org/apache/pinot/integration/tests/E*Test.java</include>
<include>**/org/apache/pinot/integration/tests/F*Test.java</include>
<include>**/org/apache/pinot/integration/tests/G*Test.java</include>
<include>**/org/apache/pinot/integration/tests/H*Test.java</include>
<include>**/org/apache/pinot/integration/tests/I*Test.java</include>
<include>**/org/apache/pinot/integration/tests/J*Test.java</include>
<include>**/org/apache/pinot/integration/tests/K*Test.java</include>
<include>**/org/apache/pinot/integration/tests/L*Test.java</include>
<include>**/org/apache/pinot/integration/tests/M*Test.java</include>
<include>**/org/apache/pinot/integration/tests/N*Test.java</include>
</includes>
</configuration>
<executions>
<!-- Disable default execution so only our custom executions run -->
<execution>
<id>default-test</id>
<phase>none</phase>
</execution>
<!-- 1) Run the "must-run-first" test -->
<execution>
<id>run-first-exactly-once</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<skipTests>false</skipTests>
<includes>
<include>**/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java</include>
</includes>
</configuration>
</execution>
<!-- 2) Run the rest of set-1 -->
<execution>
<id>run-rest-set-1</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<skipTests>false</skipTests>
<includes>
<include>**/org/apache/pinot/integration/tests/A*Test.java</include>
<include>**/org/apache/pinot/integration/tests/B*Test.java</include>
<include>**/org/apache/pinot/integration/tests/C*Test.java</include>
<include>**/org/apache/pinot/integration/tests/D*Test.java</include>
<include>**/org/apache/pinot/integration/tests/E*Test.java</include>
<include>**/org/apache/pinot/integration/tests/F*Test.java</include>
<include>**/org/apache/pinot/integration/tests/G*Test.java</include>
<include>**/org/apache/pinot/integration/tests/H*Test.java</include>
<include>**/org/apache/pinot/integration/tests/I*Test.java</include>
<include>**/org/apache/pinot/integration/tests/J*Test.java</include>
<include>**/org/apache/pinot/integration/tests/K*Test.java</include>
<include>**/org/apache/pinot/integration/tests/L*Test.java</include>
<include>**/org/apache/pinot/integration/tests/M*Test.java</include>
<include>**/org/apache/pinot/integration/tests/N*Test.java</include>
</includes>
<excludes>
<exclude>**/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java</exclude>
</excludes>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,40 @@ protected boolean useKafkaTransaction() {

@Override
protected int getNumKafkaBrokers() {
return DEFAULT_TRANSACTION_NUM_KAFKA_BROKERS;
// 2 brokers: fewer resources than 3, but transaction coordinator handles abort better than 1
return 2;
}

@Override
protected int getKafkaStartMaxAttempts() {
// Transactional quorum is resource-intensive; use more retries for both CI and local
return 5;
}
Comment thread
xiangfu0 marked this conversation as resolved.

@Override
protected long getKafkaStartRetryWaitMs() {
return 5_000L;
}

@Override
protected long getKafkaClusterReadyTimeoutMs() {
// Transaction coordinator needs time to become ready
return 120_000L;
}

@Override
protected long getRealtimePartitionsReadyTimeoutMs() {
// Transactional consumer needs time to initialize with transaction coordinator
return 300_000L;
}

@Override
protected long getDocsLoadedTimeoutMs() {
return 1_200_000L;
}

private static final long POST_COMMIT_PROPAGATION_DELAY_MS = 5_000L;

@Override
protected void pushAvroIntoKafka(List<File> avroFiles)
throws Exception {
Expand All @@ -88,6 +114,13 @@ protected void pushAvroIntoKafka(List<File> avroFiles)
ClusterIntegrationTestUtils
.pushAvroIntoKafkaWithTransaction(avroFiles, kafkaBrokerList, getKafkaTopic(),
getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(), true);
// Allow transaction coordinator to propagate commit state before consumer fetches
try {
Thread.sleep(POST_COMMIT_PROPAGATION_DELAY_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for transaction commit propagation", e);
}
}

private boolean isRetryableRealtimePartitionMetadataError(Throwable throwable) {
Expand Down
4 changes: 2 additions & 2 deletions pinot-integration-tests/src/test/resources/log4j2-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<Console name="console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %p [%c{1}] [%t] %m%n"/>
<Filters>
<BurstFilter level="ERROR" rate="5" maxBurst="10"/>
<BurstFilter level="INFO" rate="5" maxBurst="10"/>
</Filters>
</Console>
<Console name="spammy" target="SYSTEM_OUT">
Expand All @@ -42,7 +42,7 @@
<Logger name="org.apache.pinot.core.accounting" level="warn" additivity="false">
<AppenderRef ref="spammy"/>
</Logger>
<Root level="error">
<Root level="info">
<AppenderRef ref="console"/>
</Root>
</Loggers>
Expand Down
Loading