Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,40 @@
*/
package org.apache.pinot.integration.tests;

import com.google.common.primitives.Longs;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ExactlyOnceKafkaRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegrationTest {

private static final Logger LOGGER =
LoggerFactory.getLogger(ExactlyOnceKafkaRealtimeClusterIntegrationTest.class);
private static final int REALTIME_TABLE_CONFIG_RETRY_COUNT = 5;
private static final long REALTIME_TABLE_CONFIG_RETRY_WAIT_MS = 1_000L;
private static final long KAFKA_TOPIC_METADATA_READY_TIMEOUT_MS = 30_000L;
Expand Down Expand Up @@ -80,14 +98,183 @@ protected long getDocsLoadedTimeoutMs() {
protected void pushAvroIntoKafka(List<File> avroFiles)
throws Exception {
String kafkaBrokerList = getKafkaBrokerList();
// the first transaction of kafka messages are aborted
ClusterIntegrationTestUtils
.pushAvroIntoKafkaWithTransaction(avroFiles, kafkaBrokerList, getKafkaTopic(),
getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(), false);
// the second transaction of kafka messages are committed
ClusterIntegrationTestUtils
.pushAvroIntoKafkaWithTransaction(avroFiles, kafkaBrokerList, getKafkaTopic(),
getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(), true);
LOGGER.info("Pushing transactional data to Kafka at: {}", kafkaBrokerList);
LOGGER.info("Avro files count: {}", avroFiles.size());

Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerList);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "600000");
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transaction-" + UUID.randomUUID());

// Use a SINGLE producer for both abort and commit transactions.
// With a single producer, the coordinator's state machine ensures that after
// abortTransaction() returns, it returns CONCURRENT_TRANSACTIONS for any new
// transaction operations until the abort is fully done (markers written).
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
producer.initTransactions();
LOGGER.info("initTransactions() succeeded");

// Transaction 1: aborted batch
long abortedCount = pushAvroRecords(producer, avroFiles, false);
LOGGER.info("Aborted batch: {} records", abortedCount);

// Transaction 2: committed batch
long committedCount = pushAvroRecords(producer, avroFiles, true);
LOGGER.info("Committed batch: {} records", committedCount);
}

// After producer is closed, verify data visibility with independent consumers
LOGGER.info("Producer closed. Verifying data visibility...");
waitForCommittedRecordsVisible(kafkaBrokerList);
}

/**
* Wait for committed records to be visible to a read_committed consumer.
* This ensures transaction markers have been fully propagated before returning.
*/
private void waitForCommittedRecordsVisible(String brokerList) {
long deadline = System.currentTimeMillis() + 60_000L;
int lastCommitted = 0;
int lastUncommitted = 0;
int iteration = 0;

while (System.currentTimeMillis() < deadline) {
iteration++;
lastCommitted = countRecords(brokerList, "read_committed");
if (lastCommitted > 0) {
LOGGER.info("Verification OK: read_committed={} after {} iterations", lastCommitted, iteration);
return;
}
// Check if data reached Kafka at all
if (iteration == 1 || iteration % 5 == 0) {
lastUncommitted = countRecords(brokerList, "read_uncommitted");
LOGGER.info("Verification iteration {}: read_committed={}, read_uncommitted={}",
iteration, lastCommitted, lastUncommitted);
}
try {
Thread.sleep(2_000L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}

// Final diagnostic dump
lastUncommitted = countRecords(brokerList, "read_uncommitted");
LOGGER.error("VERIFICATION FAILED after 60s: read_committed={}, read_uncommitted={}",
lastCommitted, lastUncommitted);
throw new AssertionError("[ExactlyOnce] Transaction markers were not propagated within 60s; "
+ "committed records are not visible to read_committed consumers. "
+ "read_committed=" + lastCommitted + ", read_uncommitted=" + lastUncommitted);
}

/**
* Push Avro records to Kafka within a transaction. Does NOT call initTransactions().
* Returns the number of records sent.
*/
private long pushAvroRecords(KafkaProducer<byte[], byte[]> producer, List<File> avroFiles, boolean commit)
throws Exception {
int maxMessagesPerTransaction =
getMaxNumKafkaMessagesPerBatch() > 0 ? getMaxNumKafkaMessagesPerBatch() : Integer.MAX_VALUE;
long counter = 0;
int recordsInTransaction = 0;
boolean hasOpenTransaction = false;
byte[] header = getKafkaMessageHeader();
String partitionColumn = getPartitionColumn();

try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) {
for (File avroFile : avroFiles) {
try (DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile)) {
BinaryEncoder binaryEncoder = new EncoderFactory().directBinaryEncoder(outputStream, null);
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(reader.getSchema());
for (GenericRecord genericRecord : reader) {
if (!hasOpenTransaction) {
producer.beginTransaction();
hasOpenTransaction = true;
recordsInTransaction = 0;
}

outputStream.reset();
if (header != null && header.length > 0) {
outputStream.write(header);
}
datumWriter.write(genericRecord, binaryEncoder);
binaryEncoder.flush();

byte[] keyBytes = (partitionColumn == null) ? Longs.toByteArray(counter)
: genericRecord.get(partitionColumn).toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
byte[] bytes = outputStream.toByteArray();
producer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, bytes));
counter++;

recordsInTransaction++;
if (recordsInTransaction >= maxMessagesPerTransaction) {
if (commit) {
producer.commitTransaction();
} else {
producer.abortTransaction();
}
hasOpenTransaction = false;
}
}
}
}
}
if (hasOpenTransaction) {
if (commit) {
producer.commitTransaction();
} else {
producer.abortTransaction();
}
}
return counter;
}

/**
* Count records visible in the topic with the given isolation level.
*/
private int countRecords(String brokerList, String isolationLevel) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "txn-diag-" + isolationLevel + "-" + UUID.randomUUID());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, Integer.toString(10 * 1024 * 1024));

int totalRecords = 0;
try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
List<PartitionInfo> partitions = consumer.partitionsFor(getKafkaTopic(), Duration.ofSeconds(10));
if (partitions == null || partitions.isEmpty()) {
LOGGER.warn("No partitions found for topic {}", getKafkaTopic());
return 0;
}
for (PartitionInfo pi : partitions) {
TopicPartition tp = new TopicPartition(pi.topic(), pi.partition());
consumer.assign(Collections.singletonList(tp));
consumer.seekToBeginning(Collections.singletonList(tp));
long deadline = System.currentTimeMillis() + 30_000L;
int partitionRecords = 0;
while (System.currentTimeMillis() < deadline) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(5));
if (records.isEmpty()) {
break;
}
partitionRecords += records.count();
}
totalRecords += partitionRecords;
}
} catch (Exception e) {
LOGGER.warn("Error counting records with {}: {}", isolationLevel, e.getMessage());
}
return totalRecords;
}

private boolean isRetryableRealtimePartitionMetadataError(Throwable throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.controller.api.dto.PinotTableReloadStatusResponse;
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
import org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand Down Expand Up @@ -154,6 +154,8 @@ public void testRebalance()

serverStarter1.stop();
serverStarter2.stop();
// Re-init the static executor because stopping servers shuts it down; required for subsequent operations.
SegmentBuildTimeLeaseExtender.initExecutor();
TestUtils.waitForCondition(aVoid -> _resourceManager.dropInstance(serverStarter1.getInstanceId()).isSuccessful()
&& _resourceManager.dropInstance(serverStarter2.getInstanceId()).isSuccessful(), 60_000L,
"Failed to drop servers");
Expand Down Expand Up @@ -218,6 +220,8 @@ public void testRebalanceWithBatching()

serverStarter1.stop();
serverStarter2.stop();
// Re-init the static executor because stopping servers shuts it down; required for subsequent operations.
SegmentBuildTimeLeaseExtender.initExecutor();
TestUtils.waitForCondition(aVoid -> _resourceManager.dropInstance(serverStarter1.getInstanceId()).isSuccessful()
&& _resourceManager.dropInstance(serverStarter2.getInstanceId()).isSuccessful(), 60_000L,
"Failed to drop servers");
Expand Down Expand Up @@ -249,35 +253,20 @@ public void testReload()
public void afterMethod()
throws Exception {
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
getControllerRequestClient().pauseConsumption(realtimeTableName);
TestUtils.waitForCondition((aVoid) -> {
try {
PauseStatusDetails pauseStatusDetails = getControllerRequestClient().getPauseStatusDetails(realtimeTableName);
return pauseStatusDetails.getConsumingSegments().isEmpty();
} catch (IOException e) {
throw new RuntimeException(e);
}
}, 60_000L, "Failed to drop the segments");

// Test dropping all segments one by one
List<String> segments = listSegments(realtimeTableName);
for (String segment : segments) {
dropSegment(realtimeTableName, segment);
}
// Drop the table entirely to clean up all segments and server-side upsert state.
// This is more reliable than the pause/drop-segments/restart cycle because it uses
// the standard table lifecycle and avoids issues with stale controller/server state.
dropRealtimeTable(getTableName());
waitForTableDataManagerRemoved(realtimeTableName);
waitForEVToDisappear(realtimeTableName);

// NOTE: There is a delay to remove the segment from property store
TestUtils.waitForCondition((aVoid) -> {
try {
return listSegments(realtimeTableName).isEmpty();
} catch (IOException e) {
throw new RuntimeException(e);
}
}, 60_000L, "Failed to drop the segments");
// Delete and recreate the Kafka topic for a clean stream
deleteKafkaTopic(getKafkaTopic());
createKafkaTopic(getKafkaTopic());

stopKafka(); // to clean up the topic
restartServers();
startKafka();
getControllerRequestClient().resumeConsumption(realtimeTableName);
// Recreate the table — this triggers fresh consuming segment creation
addTableConfig(_tableConfig);
}

protected void verifySegmentAssignment(Map<String, Map<String, String>> segmentAssignment, int numSegmentsExpected,
Expand Down Expand Up @@ -363,6 +352,7 @@ protected void createSchemaAndTable()
@AfterClass
public void tearDown()
throws IOException {
dropRealtimeTable(getTableName());
stopServer();
stopBroker();
stopController();
Expand Down
Loading