diff --git a/solr/cross-dc-manager/build.gradle b/solr/cross-dc-manager/build.gradle index 4ce538f67f7f..14ed70a5dbc3 100644 --- a/solr/cross-dc-manager/build.gradle +++ b/solr/cross-dc-manager/build.gradle @@ -35,6 +35,7 @@ dependencies { implementation libs.opentelemetry.sdk.metrics implementation libs.eclipse.jetty.server implementation libs.eclipse.jetty.ee10.servlet + implementation libs.google.guava implementation libs.jakarta.servlet.api implementation libs.slf4j.api runtimeOnly libs.google.protobuf.javautils diff --git a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java index c3b5bdb3a707..d368151c6afa 100644 --- a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java +++ b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java @@ -75,6 +75,8 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final String PROP_TOPIC_DEBUG = "solr.crossdc.consumer.topic.debug"; + private final KafkaConsumer> kafkaConsumer; private final AdminClient adminClient; private final CountDownLatch startLatch; @@ -101,6 +103,8 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer { private volatile boolean running = false; + private boolean topicDebug = Boolean.parseBoolean(System.getProperty(PROP_TOPIC_DEBUG, "false")); + /** * Supplier for creating and managing a working CloudSolrClient instance. This class ensures that * the CloudSolrClient instance doesn't try to use its {@link @@ -175,6 +179,7 @@ public KafkaCrossDcConsumer( conf.get(CrossDcConf.COLLAPSE_UPDATES), CrossDcConf.CollapseUpdates.PARTIAL); this.maxCollapseRecords = conf.getInt(KafkaCrossDcConf.MAX_COLLAPSE_RECORDS); this.startLatch = startLatch; + final Properties kafkaConsumerProps = new Properties(); kafkaConsumerProps.put( @@ -375,6 +380,9 @@ boolean pollAndProcessRequests() { ConsumerRecord> lastRecord = null; for (TopicPartition partition : records.partitions()) { + if (log.isTraceEnabled()) { + log.trace("Checking partition {}", partition.partition()); + } List>> partitionRecords = records.records(partition); @@ -396,19 +404,31 @@ boolean pollAndProcessRequests() { metrics.incrementInputMsgCounter(); lastRecord = requestRecord; - MirroredSolrRequest req = requestRecord.value(); - SolrRequest solrReq = req.getSolrRequest(); - MirroredSolrRequest.Type type = req.getType(); + final MirroredSolrRequest req = requestRecord.value(); + final SolrRequest solrReq = req.getSolrRequest(); + final MirroredSolrRequest.Type type = req.getType(); if (type != MirroredSolrRequest.Type.UPDATE) { String action = solrReq.getParams().get("action", "unknown"); metrics.incrementInputReqCounter(type.name(), action); } - ModifiableSolrParams params = new ModifiableSolrParams(solrReq.getParams()); + final ModifiableSolrParams params = new ModifiableSolrParams(solrReq.getParams()); if (log.isTraceEnabled()) { log.trace("-- picked type={}, params={}", req.getType(), params); } + if (topicDebug) { + solrReq.addHeader("topic.debug", "true"); + solrReq.addHeader("record.topic", requestRecord.topic()); + solrReq.addHeader("record.partition", String.valueOf(requestRecord.partition())); + solrReq.addHeader("record.offset", String.valueOf(requestRecord.offset())); + solrReq.addHeader("record.timestamp", String.valueOf(requestRecord.timestamp())); + solrReq.addHeader("record.key", requestRecord.key()); + solrReq.addHeader("workUnit.nextOffset", String.valueOf(workUnit.nextOffset)); + solrReq.addHeader("workUnit.partition", String.valueOf(workUnit.partition)); + solrReq.addHeader("workUnit.topic", workUnit.topic); + solrReq.addHeader("workUnit.items", String.valueOf(workUnit.workItems.size())); + } // determine if it's an UPDATE with deletes, or if the existing batch has deletes boolean hasDeletes = false; @@ -450,6 +470,7 @@ boolean pollAndProcessRequests() { if (updateReqBatch == null) { // just initialize updateReqBatch = new UpdateRequest(); + updateReqBatch.addHeaders(solrReq.getHeaders()); } else { if (collapseUpdates == CrossDcConf.CollapseUpdates.NONE) { throw new RuntimeException("Can't collapse requests."); @@ -490,6 +511,7 @@ boolean pollAndProcessRequests() { if (updateReqBatch != null) { sendBatch(updateReqBatch, MirroredSolrRequest.Type.UPDATE, lastRecord, workUnit); + updateReqBatch = null; } try { partitionManager.checkForOffsetUpdates(partition); diff --git a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/PartitionManager.java b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/PartitionManager.java index c1004528ab69..c93740f25ea0 100644 --- a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/PartitionManager.java +++ b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/PartitionManager.java @@ -16,6 +16,7 @@ */ package org.apache.solr.crossdc.manager.consumer; +import com.google.common.annotations.VisibleForTesting; import java.lang.invoke.MethodHandles; import java.util.ArrayDeque; import java.util.HashSet; @@ -44,13 +45,16 @@ static class PartitionWork { final Queue partitionQueue = new ArrayDeque<>(); } - static class WorkUnit { - final TopicPartition partition; - Set> workItems = new HashSet<>(); + @VisibleForTesting + public static class WorkUnit { + final int partition; + final String topic; + final Set> workItems = new HashSet<>(); long nextOffset; - public WorkUnit(TopicPartition partition) { - this.partition = partition; + WorkUnit(TopicPartition partition) { + this.partition = partition.partition(); + this.topic = partition.topic(); } } diff --git a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java index e837be5fec37..73b6bd8abd5a 100644 --- a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java +++ b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java @@ -16,8 +16,12 @@ */ package org.apache.solr.crossdc.manager; +import static org.apache.solr.crossdc.common.CrossDcConf.COLLAPSE_UPDATES; +import static org.apache.solr.crossdc.common.KafkaCrossDcConf.BATCH_SIZE_BYTES; +import static org.apache.solr.crossdc.common.KafkaCrossDcConf.BOOTSTRAP_SERVERS; import static org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE; import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS; +import static org.apache.solr.crossdc.common.KafkaCrossDcConf.TOPIC_NAME; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering; @@ -28,14 +32,18 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.stream.IntStream; import org.apache.commons.io.IOUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -45,6 +53,7 @@ import org.apache.solr.SolrIgnoredThreadsFilter; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.jetty.HttpJettySolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; @@ -56,14 +65,19 @@ import org.apache.solr.cloud.MiniSolrCloudCluster; import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.ObjectReleaseTracker; import org.apache.solr.common.util.SolrNamedThreadFactory; +import org.apache.solr.common.util.Utils; import org.apache.solr.crossdc.common.KafkaCrossDcConf; import org.apache.solr.crossdc.common.MirroredSolrRequest; import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer; import org.apache.solr.crossdc.manager.consumer.Consumer; +import org.apache.solr.crossdc.manager.consumer.ConsumerMetrics; +import org.apache.solr.crossdc.manager.consumer.KafkaCrossDcConsumer; +import org.apache.solr.crossdc.manager.consumer.PartitionManager; import org.apache.solr.util.SolrKafkaTestsIgnoredThreadsFilter; import org.junit.After; import org.junit.Before; @@ -90,11 +104,64 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase { private static final int NUM_BROKERS = 1; public EmbeddedKafkaCluster kafkaCluster; + private static class ConsumerBatch { + final String kafkaTopic; + final int partitionId; + final MirroredSolrRequest.Type type; + final String collection; + final Map headers; + final Set addIds = new HashSet<>(); + final String json; + + public ConsumerBatch(final MirroredSolrRequest.Type type, final SolrRequest solrRequest) { + this.kafkaTopic = solrRequest.getHeaders().get("record.topic"); + this.partitionId = Integer.parseInt(solrRequest.getHeaders().get("record.partition")); + this.type = type; + this.collection = solrRequest.getCollection(); + this.headers = solrRequest.getHeaders(); + if (solrRequest instanceof UpdateRequest) { + UpdateRequest updateReq = (UpdateRequest) solrRequest; + json = + Utils.toJSONString( + Map.of("params", updateReq.getParams(), "add", updateReq.getDocuments())); + updateReq.getDocuments().forEach(doc -> addIds.add(doc.getFieldValue("id").toString())); + } else { + json = + Utils.toJSONString( + Map.of("params", solrRequest.getParams(), "class", solrRequest.getClass())); + } + } + + @Override + public String toString() { + return "ConsumerBatch{" + + "kafkaTopic='" + + kafkaTopic + + '\'' + + ", partitionId=" + + partitionId + + ", type=" + + type + + ", collection='" + + collection + + '\'' + + ", headers=" + + headers + + '\'' + + ", json='" + + json + + '\'' + + '}'; + } + } + protected volatile MiniSolrCloudCluster solrCluster1; protected volatile MiniSolrCloudCluster solrCluster2; protected volatile Consumer consumer; + private List consumerBatches; + private static final String TOPIC = "topic1"; private static final String COLLECTION = "collection1"; @@ -112,7 +179,28 @@ public void beforeSolrAndKafkaIntegrationTest() throws Exception { Thread.setDefaultUncaughtExceptionHandler( (t, e) -> log.error("Uncaught exception in thread {}", t, e)); System.setProperty("otel.metrics.exporter", "prometheus"); - consumer = new Consumer(); + System.setProperty(KafkaCrossDcConsumer.PROP_TOPIC_DEBUG, "true"); + consumerBatches = new ArrayList<>(); + consumer = + new Consumer() { + @Override + protected CrossDcConsumer getCrossDcConsumer( + final KafkaCrossDcConf conf, + final ConsumerMetrics metrics, + final CountDownLatch startLatch) { + return new KafkaCrossDcConsumer(conf, metrics, startLatch) { + @Override + public void sendBatch( + final SolrRequest solrReqBatch, + final MirroredSolrRequest.Type type, + final ConsumerRecord> lastRecord, + final PartitionManager.WorkUnit workUnit) { + consumerBatches.add(new ConsumerBatch(type, solrReqBatch)); + super.sendBatch(solrReqBatch, type, lastRecord, workUnit); + } + }; + } + }; Properties config = new Properties(); kafkaCluster = @@ -124,13 +212,15 @@ public String bootstrapServers() { }; kafkaCluster.start(); - kafkaCluster.createTopic(TOPIC, 10, 1); + // create many partitions to test for re-ordered reads + kafkaCluster.createTopic(TOPIC, 3, 1); // ensure small batches to test multi-partition ordering - System.setProperty("batchSizeBytes", "128"); - System.setProperty("solr.crossdc.topicName", TOPIC); - System.setProperty("solr.crossdc.bootstrapServers", kafkaCluster.bootstrapServers()); + System.setProperty(BATCH_SIZE_BYTES, "100"); + System.setProperty(TOPIC_NAME, TOPIC); + System.setProperty(BOOTSTRAP_SERVERS, kafkaCluster.bootstrapServers()); System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false"); + System.setProperty(COLLAPSE_UPDATES, "none"); solrCluster1 = configureCluster(1).addConfig("conf", getFile("configs/cloud-minimal/conf")).configure(); @@ -238,10 +328,62 @@ public void testProducerToCloud() throws Exception { "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."; @Test - @Ignore("SOLR-18077") + public void testPartitioning() throws Exception { + CollectionAdminRequest.Create create = + CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 1); + create.process(solrCluster1.getSolrClient()); + create.process(solrCluster2.getSolrClient()); + solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1); + solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1); + + CloudSolrClient client = solrCluster1.getSolrClient(); + int NUM_DOCS = 200; + for (int i = 0; i < NUM_DOCS; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", "id-" + i); + doc.addField("id_i", i); + doc.addField("text", "some test with a relatively long field. " + LOREM_IPSUM); + doc.addField("collection_t", COLLECTION); + + client.add(COLLECTION, doc); + + doc = new SolrInputDocument(); + doc.addField("id", "id-" + i); + doc.addField("id_i", i); + doc.addField("text", "some test with a relatively long field. " + LOREM_IPSUM); + doc.addField("collection_t", ALT_COLLECTION); + + client.add(ALT_COLLECTION, doc); + } + client.commit(COLLECTION); + client.commit(ALT_COLLECTION); + // check that updates to different collections were always sent to the same partition + Map partitionsPerCol = new HashMap<>(); + Map> docsPerCol = new HashMap<>(); + for (ConsumerBatch batch : consumerBatches) { + String collection = + partitionsPerCol.computeIfAbsent(batch.partitionId, k -> batch.collection); + docsPerCol.computeIfAbsent(collection, col -> new HashSet<>()).addAll(batch.addIds); + assertEquals( + "request in partition " + + batch.partitionId + + " has wrong collection " + + batch.collection + + ": " + + batch + + "\npartitions: " + + partitionsPerCol, + collection, + batch.collection); + } + docsPerCol.forEach( + (col, ids) -> assertEquals("incorrect count in collection " + col, NUM_DOCS, ids.size())); + } + + @Test public void testStrictOrdering() throws Exception { CloudSolrClient client = solrCluster1.getSolrClient(); - int NUM_DOCS = 5000; + int NUM_DOCS = 1000; // delay deletes by this many docs int DELTA = 100; for (int i = 0; i < NUM_DOCS; i++) { @@ -454,11 +596,12 @@ private void assertClusterEventuallyHasDocs( boolean foundUpdates = false; for (int i = 0; i < 100; i++) { client.commit(collection); - results = client.query(collection, new SolrQuery(query)); + results = + client.query(collection, new SolrQuery(CommonParams.Q, query, CommonParams.FL, "*")); if (results.getResults().getNumFound() == expectedNumDocs) { foundUpdates = true; } else { - Thread.sleep(200); + Thread.sleep(300); } }