Skip to content

Commit a53312b

Browse files
committed
trying to give more wait time and retry for kafka setup for ExactlyOnceKafkaRealtimeClusterIntegrationTest
1 parent 325ac2a commit a53312b

3 files changed

Lines changed: 102 additions & 28 deletions

File tree

pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,29 @@ protected int getNumKafkaBrokers() {
194194
return useKafkaTransaction() ? DEFAULT_TRANSACTION_NUM_KAFKA_BROKERS : DEFAULT_LLC_NUM_KAFKA_BROKERS;
195195
}
196196

197+
/**
198+
* Maximum number of attempts to start the Kafka cluster. Subclasses may override to use more
199+
* attempts in resource-constrained environments (e.g. CI).
200+
*/
201+
protected int getKafkaStartMaxAttempts() {
202+
return KAFKA_START_MAX_ATTEMPTS;
203+
}
204+
205+
/**
206+
* Wait time in ms between Kafka startup retry attempts. Subclasses may override for CI.
207+
*/
208+
protected long getKafkaStartRetryWaitMs() {
209+
return KAFKA_START_RETRY_WAIT_MS;
210+
}
211+
212+
/**
213+
* Timeout in ms for Kafka cluster to become ready (brokers + transaction coordinator).
214+
* Subclasses may override for resource-constrained environments (e.g. CI).
215+
*/
216+
protected long getKafkaClusterReadyTimeoutMs() {
217+
return KAFKA_CLUSTER_READY_TIMEOUT_MS;
218+
}
219+
197220
protected int getKafkaPort() {
198221
int idx = RANDOM.nextInt(_kafkaStarters.size());
199222
return _kafkaStarters.get(idx).getPort();
@@ -818,7 +841,7 @@ protected void startKafkaWithoutTopic() {
818841
int requestedBrokers = getNumKafkaBrokers();
819842
List<KafkaBrokerConfig> brokerConfigs = getOrCreateKafkaBrokerConfigs(requestedBrokers);
820843
Throwable lastFailure = null;
821-
for (int attempt = 1; attempt <= KAFKA_START_MAX_ATTEMPTS; attempt++) {
844+
for (int attempt = 1; attempt <= getKafkaStartMaxAttempts(); attempt++) {
822845
String clusterId = UUID.randomUUID().toString().replace("-", "");
823846
String networkName = "pinot-it-kafka-" + UUID.randomUUID().toString().replace("-", "");
824847
String quorumVoters = brokerConfigs.stream()
@@ -836,7 +859,7 @@ protected void startKafkaWithoutTopic() {
836859
_kafkaStarters = kafkaStarters;
837860
waitForKafkaClusterReady(getKafkaBrokerList(), requestedBrokers, useKafkaTransaction());
838861
if (attempt > 1) {
839-
LOGGER.info("Kafka startup succeeded on retry attempt {}/{}", attempt, KAFKA_START_MAX_ATTEMPTS);
862+
LOGGER.info("Kafka startup succeeded on retry attempt {}/{}", attempt, getKafkaStartMaxAttempts());
840863
}
841864
return;
842865
} catch (Throwable t) {
@@ -846,19 +869,19 @@ protected void startKafkaWithoutTopic() {
846869

847870
lastFailure = t;
848871
LOGGER.warn("Kafka startup attempt {}/{} failed; stopping started brokers before retry", attempt,
849-
KAFKA_START_MAX_ATTEMPTS, t);
872+
getKafkaStartMaxAttempts(), t);
850873
_kafkaStarters = kafkaStarters;
851874
try {
852875
stopKafka();
853876
} catch (RuntimeException stopException) {
854-
LOGGER.warn("Kafka cleanup failed after startup attempt {}/{}", attempt, KAFKA_START_MAX_ATTEMPTS,
877+
LOGGER.warn("Kafka cleanup failed after startup attempt {}/{}", attempt, getKafkaStartMaxAttempts(),
855878
stopException);
856879
t.addSuppressed(stopException);
857880
}
858881

859-
if (attempt < KAFKA_START_MAX_ATTEMPTS) {
882+
if (attempt < getKafkaStartMaxAttempts()) {
860883
try {
861-
Thread.sleep(KAFKA_START_RETRY_WAIT_MS);
884+
Thread.sleep(getKafkaStartRetryWaitMs());
862885
} catch (InterruptedException interruptedException) {
863886
Thread.currentThread().interrupt();
864887
throw new RuntimeException("Interrupted while waiting to retry Kafka startup", interruptedException);
@@ -868,7 +891,7 @@ protected void startKafkaWithoutTopic() {
868891
}
869892

870893
_kafkaBrokerConfigs = null;
871-
throw new RuntimeException("Failed to start Kafka cluster after " + KAFKA_START_MAX_ATTEMPTS + " attempts",
894+
throw new RuntimeException("Failed to start Kafka cluster after " + getKafkaStartMaxAttempts() + " attempts",
872895
lastFailure);
873896
}
874897

@@ -921,11 +944,12 @@ private static int getAvailablePort() {
921944
}
922945

923946
private void waitForKafkaClusterReady(String brokerList, int brokerCount, boolean requireTransactions) {
947+
long clusterReadyTimeoutMs = getKafkaClusterReadyTimeoutMs();
924948
TestUtils.waitForCondition(aVoid -> isKafkaClusterReady(brokerList, brokerCount), 200L,
925-
KAFKA_CLUSTER_READY_TIMEOUT_MS,
949+
clusterReadyTimeoutMs,
926950
"Kafka brokers are not ready");
927951
if (requireTransactions) {
928-
TestUtils.waitForCondition(aVoid -> canInitTransactions(brokerList), 500L, KAFKA_CLUSTER_READY_TIMEOUT_MS,
952+
TestUtils.waitForCondition(aVoid -> canInitTransactions(brokerList), 500L, clusterReadyTimeoutMs,
929953
"Kafka transaction coordinator is not ready");
930954
}
931955
}

pinot-integration-tests/pom.xml

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -86,25 +86,57 @@
8686
<plugin>
8787
<groupId>org.apache.maven.plugins</groupId>
8888
<artifactId>maven-surefire-plugin</artifactId>
89-
<configuration>
90-
<skipTests>false</skipTests>
91-
<includes>
92-
<include>**/org/apache/pinot/integration/tests/A*Test.java</include>
93-
<include>**/org/apache/pinot/integration/tests/B*Test.java</include>
94-
<include>**/org/apache/pinot/integration/tests/C*Test.java</include>
95-
<include>**/org/apache/pinot/integration/tests/D*Test.java</include>
96-
<include>**/org/apache/pinot/integration/tests/E*Test.java</include>
97-
<include>**/org/apache/pinot/integration/tests/F*Test.java</include>
98-
<include>**/org/apache/pinot/integration/tests/G*Test.java</include>
99-
<include>**/org/apache/pinot/integration/tests/H*Test.java</include>
100-
<include>**/org/apache/pinot/integration/tests/I*Test.java</include>
101-
<include>**/org/apache/pinot/integration/tests/J*Test.java</include>
102-
<include>**/org/apache/pinot/integration/tests/K*Test.java</include>
103-
<include>**/org/apache/pinot/integration/tests/L*Test.java</include>
104-
<include>**/org/apache/pinot/integration/tests/M*Test.java</include>
105-
<include>**/org/apache/pinot/integration/tests/N*Test.java</include>
106-
</includes>
107-
</configuration>
89+
<executions>
90+
<!-- Disable default execution so only our custom executions run -->
91+
<execution>
92+
<id>default-test</id>
93+
<phase>none</phase>
94+
</execution>
95+
<!-- 1) Run the "must-run-first" test -->
96+
<execution>
97+
<id>run-first-exactly-once</id>
98+
<phase>test</phase>
99+
<goals>
100+
<goal>test</goal>
101+
</goals>
102+
<configuration>
103+
<skipTests>false</skipTests>
104+
<includes>
105+
<include>**/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java</include>
106+
</includes>
107+
</configuration>
108+
</execution>
109+
<!-- 2) Run the rest of set-1 -->
110+
<execution>
111+
<id>run-rest-set-1</id>
112+
<phase>test</phase>
113+
<goals>
114+
<goal>test</goal>
115+
</goals>
116+
<configuration>
117+
<skipTests>false</skipTests>
118+
<includes>
119+
<include>**/org/apache/pinot/integration/tests/A*Test.java</include>
120+
<include>**/org/apache/pinot/integration/tests/B*Test.java</include>
121+
<include>**/org/apache/pinot/integration/tests/C*Test.java</include>
122+
<include>**/org/apache/pinot/integration/tests/D*Test.java</include>
123+
<include>**/org/apache/pinot/integration/tests/E*Test.java</include>
124+
<include>**/org/apache/pinot/integration/tests/F*Test.java</include>
125+
<include>**/org/apache/pinot/integration/tests/G*Test.java</include>
126+
<include>**/org/apache/pinot/integration/tests/H*Test.java</include>
127+
<include>**/org/apache/pinot/integration/tests/I*Test.java</include>
128+
<include>**/org/apache/pinot/integration/tests/J*Test.java</include>
129+
<include>**/org/apache/pinot/integration/tests/K*Test.java</include>
130+
<include>**/org/apache/pinot/integration/tests/L*Test.java</include>
131+
<include>**/org/apache/pinot/integration/tests/M*Test.java</include>
132+
<include>**/org/apache/pinot/integration/tests/N*Test.java</include>
133+
</includes>
134+
<excludes>
135+
<exclude>**/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java</exclude>
136+
</excludes>
137+
</configuration>
138+
</execution>
139+
</executions>
108140
</plugin>
109141
</plugins>
110142
</build>

pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,24 @@ protected int getNumKafkaBrokers() {
7171
return DEFAULT_TRANSACTION_NUM_KAFKA_BROKERS;
7272
}
7373

74+
@Override
75+
protected int getKafkaStartMaxAttempts() {
76+
// CI environments are resource-constrained; allow more retries for 3-broker transactional quorum
77+
return Boolean.parseBoolean(System.getenv("GITHUB_ACTIONS")) ? 5 : super.getKafkaStartMaxAttempts();
78+
}
79+
80+
@Override
81+
protected long getKafkaStartRetryWaitMs() {
82+
// Give CI more time between retries for broker startup
83+
return Boolean.parseBoolean(System.getenv("GITHUB_ACTIONS")) ? 5_000L : super.getKafkaStartRetryWaitMs();
84+
}
85+
86+
@Override
87+
protected long getKafkaClusterReadyTimeoutMs() {
88+
// CI needs longer timeout for 3-broker quorum + transaction coordinator
89+
return Boolean.parseBoolean(System.getenv("GITHUB_ACTIONS")) ? 180_000L : super.getKafkaClusterReadyTimeoutMs();
90+
}
91+
7492
@Override
7593
protected long getDocsLoadedTimeoutMs() {
7694
return 1_200_000L;

0 commit comments

Comments
 (0)