From 5fa1ed48c553d9d382326e25d8a057392d686aee Mon Sep 17 00:00:00 2001 From: Sergey Chugunov Date: Fri, 29 May 2026 14:20:46 +0300 Subject: [PATCH 01/11] IGNITE-28692 WIP --- .../processors/cache/CacheMetricsImpl.java | 45 ++++ .../processors/cache/GridCacheProcessor.java | 2 + .../GridDhtPartitionsExchangeFuture.java | 13 +- .../MdcAffinityBackupFilterSelfTest.java | 2 + .../cache/metric/MdcCacheMetricsTest.java | 245 ++++++++++++++++++ 5 files changed, 305 insertions(+), 2 deletions(-) create mode 100644 modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/MdcCacheMetricsTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index a67d265dbf79c..9bf5f73b0ff73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -22,7 +22,13 @@ import java.util.function.Supplier; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter; +import org.apache.ignite.cache.affinity.rendezvous.MdcAffinityBackupFilter; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; @@ -43,6 +49,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.jetbrains.annotations.Nullable; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -246,6 +253,9 @@ public class CacheMetricsImpl implements CacheMetrics { /** Conflict resolver merged entries count. */ private LongAdderMetric rslvrMergedCnt; + /** */ + private Boolean mdcReadyAff; + /** * Creates cache metrics. * @@ -1679,6 +1689,11 @@ public void onOffHeapEvict() { return fut != null && !fut.isDone(); } + /** */ + private boolean isMdcReadyAffinity() { + return mdcReadyAff != null && mdcReadyAff; + } + /** {@inheritDoc} */ @Override public long getIndexRebuildKeysProcessed() { return idxRebuildKeyProcessed.value(); @@ -1745,6 +1760,36 @@ public void registerResolverMetrics() { "Conflict resolver merged entries count"); } + /** */ + public void updateMdcMetrics() { + // Calculate this metric only once. + if (mdcReadyAff == null) { + GridKernalContext kCtx = cctx.kernalContext(); + + if (kCtx.clientNode()) + return; + + if (kCtx.discovery().localNode() == null || kCtx.discovery().localNode().dataCenterId() == null) + return; + + mdcReadyAff = Boolean.TRUE; + + mreg.register("IsCacheAffinityMdcReady", this::isMdcReadyAffinity, + "True if cache affinity guarantees having a copy of partition in each data center."); + + AffinityFunction affFunc = cctx.config().getAffinity(); + + if (affFunc instanceof RendezvousAffinityFunction) { + IgniteBiPredicate> filter = ((RendezvousAffinityFunction)affFunc).getAffinityBackupFilter(); + + if (!(filter instanceof MdcAffinityBackupFilter) && !(filter instanceof ClusterNodeAttributeColocatedBackupFilter)) + mdcReadyAff = Boolean.FALSE; + } + } + } + + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheMetricsImpl.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 223dac5edaa5d..ed6685efd9697 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1137,6 +1137,8 @@ private void onKernalStart(GridCacheAdapter cache) throws IgniteCheckedExc cache.onKernalStart(); + cache.metrics0().updateMdcMetrics(); + if (ctx.events().isRecordable(EventType.EVT_CACHE_STARTED)) ctx.events().addEvent(EventType.EVT_CACHE_STARTED); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 733f9b0adc462..d21fae2f3f2f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -69,6 +69,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; @@ -2469,8 +2470,16 @@ private String exchangeTimingsLogMessage(String header, List timings) { cleanIdxRebuildFutures = false; - for (CacheGroupContext grp : cctx.cache().cacheGroups()) - grp.topology().onExchangeDone(this, grp.affinity().readyAffinity(res), false); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + AffinityAssignment assignment = grp.affinity().readyAffinity(res); + + grp.topology().onExchangeDone(this, assignment, false); + + if (!grp.isReplicated()) { + System.out.println("-->>-->> [" + System.currentTimeMillis() + "][" + Thread.currentThread().getName() + "] " + + "exchange done for grp " + grp.cacheOrGroupName()); + } + } if (changedAffinity()) cctx.walState().disableGroupDurabilityForPreloading(this); diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java index a45f3ccd90124..97c1949458400 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java @@ -314,6 +314,8 @@ private IgniteEx startClusterAcrossDataCenters(String[] dcIds, int nodesPerDc) t lastNode = startGrid(nodeIdx++); } + System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); + return lastNode; } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/MdcCacheMetricsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/MdcCacheMetricsTest.java new file mode 100644 index 0000000000000..2af15a423f151 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/MdcCacheMetricsTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.metric; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter; +import org.apache.ignite.cache.affinity.rendezvous.MdcAffinityBackupFilter; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.metric.BooleanMetric; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName; + +/** */ +public class MdcCacheMetricsTest extends GridCommonAbstractTest { + /** */ + private static final int NODES_NUMBER = 5; + /** */ + private static final String MDC_SAFE_CACHE_0 = "mdcSafeCache0"; + /** */ + private static final String MDC_SAFE_CACHE_1 = "mdcSafeCache1"; + /** */ + private static final String MDC_UNSAFE_CACHE = "mdcUnsafeCache"; + /** */ + private static final String STRETCHED_CELL_ATTR_NAME = "DC_CELL_ATTR"; + /** */ + private static final String DC_ID_0 = "DC_0"; + /** */ + private static final String DC_ID_1 = "DC_1"; + /** */ + private String curNodeDcId; + /** */ + private boolean useStaticCaches; + /** */ + private Set allCaches = new HashSet<>(); + /** */ + private Set mdcSafeCaches = new HashSet<>(); + /** */ + private Set mdcUnsafeCaches = new HashSet<>(); + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); + + allCaches.clear(); + mdcSafeCaches.clear(); + mdcUnsafeCaches.clear(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + + System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(32 * 1024 * 1024) + )); + + if (useStaticCaches) { + CacheConfiguration mdcSafeCacheCfg0 = prepareCacheCfg(MDC_SAFE_CACHE_0, new MdcAffinityBackupFilter(2, 1)); + + CacheConfiguration mdcSafeCacheCfg1 = prepareCacheCfg( + MDC_SAFE_CACHE_1, + new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME)); + + CacheConfiguration mdcUnsafeCacheCfg = prepareCacheCfg(MDC_UNSAFE_CACHE, null); + + cfg.setCacheConfiguration(mdcSafeCacheCfg0, mdcSafeCacheCfg1, mdcUnsafeCacheCfg); + } + + if (!cfg.isClientMode()) + cfg.setUserAttributes(F.asMap(STRETCHED_CELL_ATTR_NAME, curNodeDcId)); + + return cfg; + } + + /** */ + private CacheConfiguration prepareCacheCfg( + String cacheName, + IgniteBiPredicate> affinityBackupFilter) + { + CacheConfiguration cacheCfg = new CacheConfiguration(cacheName) + .setCacheMode(PARTITIONED) + .setBackups(1); + + if (affinityBackupFilter != null) { + cacheCfg.setAffinity( + new RendezvousAffinityFunction() + .setPartitions(32) + .setAffinityBackupFilter(affinityBackupFilter)); + + mdcSafeCaches.add(cacheName); + } + + allCaches.add(cacheName); + + return cacheCfg; + } + + /** */ + @Test + public void testMdcAffinityReadyMetricForDynamicCaches() throws Exception { + useStaticCaches = false; + + startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); + + IgniteEx client = startClientGrid(NODES_NUMBER - 1); + + client.cluster().state(ClusterState.ACTIVE); + + client.getOrCreateCache(prepareCacheCfg(MDC_SAFE_CACHE_0, new MdcAffinityBackupFilter(2, 1))); + + client.getOrCreateCache(prepareCacheCfg(MDC_SAFE_CACHE_1, new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME))); + + client.getOrCreateCache(prepareCacheCfg(MDC_UNSAFE_CACHE, null)); + + checkMdcReadyMetric(); + } + + /** */ + @Test + public void testMdcAffinityReadyMetricForStaticCaches() throws Exception { + useStaticCaches = true; + + startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); + + IgniteEx client = startClientGrid(NODES_NUMBER - 1); + + client.cluster().state(ClusterState.ACTIVE); + + checkMdcReadyMetric(); + } + + @Test + public void testExchangeResearch() throws Exception { + useStaticCaches = false; + + startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 1); + + IgniteEx client = startClientGrid(NODES_NUMBER - 1); + + client.cluster().state(ClusterState.ACTIVE); + + client.getOrCreateCache(prepareCacheCfg(MDC_SAFE_CACHE_0, new MdcAffinityBackupFilter(2, 1))); + + stopGrid(0); + + Thread.sleep(1000); + } + + /** */ + private void checkMdcReadyMetric() { + for (int i = 0; i < NODES_NUMBER; i++) { + IgniteEx ig = grid(i); + + for (String cacheName : allCaches) { + GridCacheContext cctx = ig.cachex(cacheName).context(); + + MetricRegistryImpl mReg = cctx.kernalContext().metric().registry(cacheMetricsRegistryName(cctx.name(), cctx.cache().isNear())); + + BooleanMetric cacheMdcSafeMetric = mReg.findMetric("IsCacheAffinityMdcReady"); + + System.out.println("-->>-->> [" + System.currentTimeMillis() + "][" + Thread.currentThread().getName() + "] checking cache: " + cacheName); + + if (ig.localNode().isClient()) { + assertNull(cacheMdcSafeMetric); + + continue; + } + else + assertNotNull("Grid: " + i + ", cache: " + cacheName, cacheMdcSafeMetric); + + boolean cacheMdcSafe = cacheMdcSafeMetric.value(); + + assertEquals(mdcSafeCaches.contains(cacheName), cacheMdcSafe); + } + } + } + + /** */ + private IgniteEx startClusterAcrossDataCenters(String[] dcIds, int nodesPerDc) throws Exception { + int nodeIdx = 0; + IgniteEx lastNode = null; + + for (String dcId : dcIds) { + System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, dcId); + + curNodeDcId = dcId; + + for (int i = 0; i < nodesPerDc; i++) + lastNode = startGrid(nodeIdx++); + } + + System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); + + return lastNode; + } +} From 378a4d4be61fba992fd4b6a1ba6306f51f468236 Mon Sep 17 00:00:00 2001 From: Sergey Chugunov Date: Thu, 11 Jun 2026 17:19:32 +0300 Subject: [PATCH 02/11] IGNITE-28692 Add metrics for MDC checks, implement tests for metrics --- .../processors/cache/CacheMetricsImpl.java | 59 ++++++--- .../processors/cache/GridCacheProcessor.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 19 ++- .../processors/cluster/BaselineTopology.java | 34 +++-- .../cache/metric/MdcCacheMetricsTest.java | 119 ++++++++++++++---- 5 files changed, 176 insertions(+), 57 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index 9bf5f73b0ff73..3356760b250cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -256,6 +256,9 @@ public class CacheMetricsImpl implements CacheMetrics { /** */ private Boolean mdcReadyAff; + /** */ + private Boolean mdcSafePartDistrib; + /** * Creates cache metrics. * @@ -1694,6 +1697,11 @@ private boolean isMdcReadyAffinity() { return mdcReadyAff != null && mdcReadyAff; } + /** */ + private boolean isMdcSafePartitionDistribution() { + return mdcSafePartDistrib == null || mdcSafePartDistrib; + } + /** {@inheritDoc} */ @Override public long getIndexRebuildKeysProcessed() { return idxRebuildKeyProcessed.value(); @@ -1761,34 +1769,49 @@ public void registerResolverMetrics() { } /** */ - public void updateMdcMetrics() { - // Calculate this metric only once. - if (mdcReadyAff == null) { - GridKernalContext kCtx = cctx.kernalContext(); + public void registerMdcMetrics() { + GridKernalContext kCtx = cctx.kernalContext(); + + if (kCtx.clientNode()) + return; + + if (kCtx.discovery().localNode() == null || kCtx.discovery().localNode().dataCenterId() == null) + return; + + registerMdcReadyAffinityMetric(); - if (kCtx.clientNode()) - return; + registerPartitionDistributionSafeMetric(); + } - if (kCtx.discovery().localNode() == null || kCtx.discovery().localNode().dataCenterId() == null) - return; + /** */ + private void registerPartitionDistributionSafeMetric() { + mreg.register("IsCachePartitionDistributionSafe", this::isMdcSafePartitionDistribution, + "True if current cache partition distribution maintains guarantee 'one partition copy in each datacenter'."); + } - mdcReadyAff = Boolean.TRUE; + /** */ + private void registerMdcReadyAffinityMetric() { + mdcReadyAff = Boolean.TRUE; - mreg.register("IsCacheAffinityMdcReady", this::isMdcReadyAffinity, - "True if cache affinity guarantees having a copy of partition in each data center."); + mreg.register("IsCacheAffinityMdcReady", this::isMdcReadyAffinity, + "True if cache affinity guarantees having a copy of partition in each data center."); - AffinityFunction affFunc = cctx.config().getAffinity(); + AffinityFunction affFunc = cctx.config().getAffinity(); - if (affFunc instanceof RendezvousAffinityFunction) { - IgniteBiPredicate> filter = ((RendezvousAffinityFunction)affFunc).getAffinityBackupFilter(); + if (affFunc instanceof RendezvousAffinityFunction) { + IgniteBiPredicate> filter = ((RendezvousAffinityFunction)affFunc).getAffinityBackupFilter(); - if (!(filter instanceof MdcAffinityBackupFilter) && !(filter instanceof ClusterNodeAttributeColocatedBackupFilter)) - mdcReadyAff = Boolean.FALSE; - } + if (!(filter instanceof MdcAffinityBackupFilter) && !(filter instanceof ClusterNodeAttributeColocatedBackupFilter)) + mdcReadyAff = Boolean.FALSE; } } - + /** + * + */ + public void setMdcSafePartitionDistribution(boolean safeDistribution) { + mdcSafePartDistrib = safeDistribution; + } /** {@inheritDoc} */ @Override public String toString() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index ed6685efd9697..40d07600e6496 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1137,7 +1137,7 @@ private void onKernalStart(GridCacheAdapter cache) throws IgniteCheckedExc cache.onKernalStart(); - cache.metrics0().updateMdcMetrics(); + cache.metrics0().registerMdcMetrics(); if (ctx.events().isRecordable(EventType.EVT_CACHE_STARTED)) ctx.events().addEvent(EventType.EVT_CACHE_STARTED); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index d21fae2f3f2f3..4a07bb4e5588e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2475,9 +2475,26 @@ private String exchangeTimingsLogMessage(String header, List timings) { grp.topology().onExchangeDone(this, assignment, false); + int numberOfDataCenters = cctx.discovery().discoCache().state().baselineTopology().numberOfDatacenters(); + if (!grp.isReplicated()) { + boolean mdcSafeDistribution = true; + + for (List nodes : assignment.assignment()) { + int dcsCount = (int)nodes.stream().map(ClusterNode::dataCenterId).distinct().count(); + + if (dcsCount < numberOfDataCenters) { + mdcSafeDistribution = false; + + break; + } + } System.out.println("-->>-->> [" + System.currentTimeMillis() + "][" + Thread.currentThread().getName() + "] " + - "exchange done for grp " + grp.cacheOrGroupName()); + "mdcSafeDistribution=" + mdcSafeDistribution + " for cache " + grp.cacheOrGroupName()); + + boolean finalMdcSafeDistribution = mdcSafeDistribution; + + grp.caches().forEach(cache -> cache.cache().metrics0().setMdcSafePartitionDistribution(finalMdcSafeDistribution)); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java index 1f71cb192a2f5..ca7fe8e66fe16 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java @@ -39,10 +39,12 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID; + /** * BaselineTopology represents a set of "database nodes" - nodes responsible for holding persistent-enabled caches * and persisting their information to durable storage. - * + *

* Two major features BaselineTopology allows are: *

    *
  1. Protection from conflicting updates.
  2. @@ -61,10 +63,10 @@ * [A,B] [C] * | | * (2)updates to both parts - * + *

    * After independent updates applied to both parts of cluster at point(2) node C should not be allowed to join * [A,B] part. - * + *

    * The following algorithm makes sure node C will never join [A,B] part back: *

      *
    1. @@ -95,10 +97,10 @@ *
    2. * When BaselineTopology is set (e.g. on first activation) or recreated (e.g. with set-baseline command) * its ID on all nodes is incremented by one. - * + *

      * So when cluster receives a join request with BaselineTopology it firstly compares joining node BlT ID with * local BlT ID. - * + *

      * If joining node has a BaselineTopology with ID greater than one in cluster it means that BlT was changed * more times there; therefore new node is not allowed to join the cluster. *

    3. @@ -107,27 +109,27 @@ * Instead current set of online nodes from BaselineTopology is used to update {@link BaselineTopology#branchingPntHash} * property of current BaselineTopology. * Old value of the property is moved to {@link BaselineTopology#branchingHist} list. - * + *

      * If joining node and local BlT IDs are the same then cluster takes branchingPntHash of joining node * and verifies that its local branchingHist contains that hash. - * + *

      * If joining node hash is not presented in cluster branching history list * it means that joining node was activated independently of currently running cluster; * therefore new node is not allowed to join the cluster. - * + *

      * If joining node hash is presented in the history, that it is safe to let the node join the cluster. * *

    4. * When BaselineTopology is recreated (e.g. with set-baseline command) previous BaselineTopology is moved * to BaselineHistory (consult source code of {@link GridClusterStateProcessor} for more details). - * + *

      * If cluster sees that joining node BlT ID is less than cluster BlT ID it looks up for BaselineHistory item * for new node ID. * Having this BaselineHistory item cluster verifies that branching history of the item contains * branching point hash of joining node * (similar check as in the case above with only difference that joining node BlT is compared against * BaselineHistory item instead of BaselineTopology). - * + *

      * If new node branching point hash is found in the history than node is allowed to join; * otherwise it is rejected. *

    5. @@ -172,7 +174,7 @@ public class BaselineTopology implements Serializable { private final List branchingHist; /** - * @param nodeMap Map of node consistent ID to it's attributes. + * @param nodeMap Map of node consistent ID to its attributes. */ private BaselineTopology(Map> nodeMap, int id) { this.id = id; @@ -274,6 +276,16 @@ public Map attributes(Object consId) { return nodeMap.get(consId); } + /** + * @return Number of unique datacenters present in the baseline or {@code -1} if unknown. + */ + public int numberOfDatacenters() { + if (nodeMap.values().iterator().next().get(ATTR_DATA_CENTER_ID) != null) + return (int)nodeMap.values().stream().map(m -> m.get(ATTR_DATA_CENTER_ID)).distinct().count(); + + return -1; + } + /** * */ diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/MdcCacheMetricsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/MdcCacheMetricsTest.java index 2af15a423f151..8146c54f37ac8 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/MdcCacheMetricsTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/MdcCacheMetricsTest.java @@ -47,9 +47,9 @@ public class MdcCacheMetricsTest extends GridCommonAbstractTest { /** */ private static final int NODES_NUMBER = 5; /** */ - private static final String MDC_SAFE_CACHE_0 = "mdcSafeCache0"; + private static final String CACHE_WITH_MDC_FILTER = "mdcSafeCache0"; /** */ - private static final String MDC_SAFE_CACHE_1 = "mdcSafeCache1"; + private static final String CACHE_WITH_COLOCATED_FILTER = "mdcSafeCache1"; /** */ private static final String MDC_UNSAFE_CACHE = "mdcUnsafeCache"; /** */ @@ -59,10 +59,18 @@ public class MdcCacheMetricsTest extends GridCommonAbstractTest { /** */ private static final String DC_ID_1 = "DC_1"; /** */ - private String curNodeDcId; + private static final String[] STRETCHED_CELL_IDS = {"CELL_0", "CELL_1"}; + /** */ + private static final String MDC_SAFE_FILTER_METRIC_NAME = "IsCacheAffinityMdcReady"; + /** */ + private static final String PARTITION_DISTRIBUTION_SAFE_METRIC_NAME = "IsCachePartitionDistributionSafe"; + /** */ + private String cellId; /** */ private boolean useStaticCaches; /** */ + private boolean persistenceEnabled; + /** */ private Set allCaches = new HashSet<>(); /** */ private Set mdcSafeCaches = new HashSet<>(); @@ -99,15 +107,15 @@ public class MdcCacheMetricsTest extends GridCommonAbstractTest { cfg.setDataStorageConfiguration(new DataStorageConfiguration() .setDefaultDataRegionConfiguration(new DataRegionConfiguration() - .setPersistenceEnabled(true) + .setPersistenceEnabled(persistenceEnabled) .setMaxSize(32 * 1024 * 1024) )); if (useStaticCaches) { - CacheConfiguration mdcSafeCacheCfg0 = prepareCacheCfg(MDC_SAFE_CACHE_0, new MdcAffinityBackupFilter(2, 1)); + CacheConfiguration mdcSafeCacheCfg0 = prepareCacheCfg(CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1)); CacheConfiguration mdcSafeCacheCfg1 = prepareCacheCfg( - MDC_SAFE_CACHE_1, + CACHE_WITH_COLOCATED_FILTER, new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME)); CacheConfiguration mdcUnsafeCacheCfg = prepareCacheCfg(MDC_UNSAFE_CACHE, null); @@ -116,7 +124,7 @@ public class MdcCacheMetricsTest extends GridCommonAbstractTest { } if (!cfg.isClientMode()) - cfg.setUserAttributes(F.asMap(STRETCHED_CELL_ATTR_NAME, curNodeDcId)); + cfg.setUserAttributes(F.asMap(STRETCHED_CELL_ATTR_NAME, cellId)); return cfg; } @@ -124,17 +132,17 @@ public class MdcCacheMetricsTest extends GridCommonAbstractTest { /** */ private CacheConfiguration prepareCacheCfg( String cacheName, - IgniteBiPredicate> affinityBackupFilter) + IgniteBiPredicate> affBackupFilter) { CacheConfiguration cacheCfg = new CacheConfiguration(cacheName) .setCacheMode(PARTITIONED) .setBackups(1); - if (affinityBackupFilter != null) { + if (affBackupFilter != null) { cacheCfg.setAffinity( new RendezvousAffinityFunction() .setPartitions(32) - .setAffinityBackupFilter(affinityBackupFilter)); + .setAffinityBackupFilter(affBackupFilter)); mdcSafeCaches.add(cacheName); } @@ -155,9 +163,9 @@ public void testMdcAffinityReadyMetricForDynamicCaches() throws Exception { client.cluster().state(ClusterState.ACTIVE); - client.getOrCreateCache(prepareCacheCfg(MDC_SAFE_CACHE_0, new MdcAffinityBackupFilter(2, 1))); + client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1))); - client.getOrCreateCache(prepareCacheCfg(MDC_SAFE_CACHE_1, new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME))); + client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_COLOCATED_FILTER, new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME))); client.getOrCreateCache(prepareCacheCfg(MDC_UNSAFE_CACHE, null)); @@ -178,21 +186,85 @@ public void testMdcAffinityReadyMetricForStaticCaches() throws Exception { checkMdcReadyMetric(); } + /** */ @Test - public void testExchangeResearch() throws Exception { - useStaticCaches = false; + public void testPartitionDistributionMetricInMemoryCaches() throws Exception { + persistenceEnabled = false; - startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 1); + startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); + + IgniteEx client = startClientGrid(NODES_NUMBER - 1); + + client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1))); + client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_COLOCATED_FILTER, + new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME))); + + BooleanMetric cacheWithMdcFilterDistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_MDC_FILTER, + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME); + BooleanMetric cacheWithColocatedFilterDistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_COLOCATED_FILTER, + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME); + + assertNotNull(cacheWithMdcFilterDistributionSafeMetric); + assertNotNull(cacheWithColocatedFilterDistributionSafeMetric); + assertTrue(cacheWithMdcFilterDistributionSafeMetric.value()); + assertTrue(cacheWithColocatedFilterDistributionSafeMetric.value()); + + stopGrid(0); + + assertTrue(cacheWithMdcFilterDistributionSafeMetric.value()); + assertFalse(cacheWithColocatedFilterDistributionSafeMetric.value()); + } + + /** */ + @Test + public void testPartitionDistributionMetricPersistentCaches() throws Exception { + persistenceEnabled = true; + + startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); IgniteEx client = startClientGrid(NODES_NUMBER - 1); client.cluster().state(ClusterState.ACTIVE); - client.getOrCreateCache(prepareCacheCfg(MDC_SAFE_CACHE_0, new MdcAffinityBackupFilter(2, 1))); + client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1))); + client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_COLOCATED_FILTER, + new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME))); + + BooleanMetric cacheWithMdcFilterDistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_MDC_FILTER, + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME); + BooleanMetric cacheWithColocatedFilterDistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_COLOCATED_FILTER, + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME); + + assertNotNull(cacheWithMdcFilterDistributionSafeMetric); + assertNotNull(cacheWithColocatedFilterDistributionSafeMetric); + assertTrue(cacheWithMdcFilterDistributionSafeMetric.value()); + assertTrue(cacheWithColocatedFilterDistributionSafeMetric.value()); stopGrid(0); - Thread.sleep(1000); + assertFalse(cacheWithMdcFilterDistributionSafeMetric.value()); + assertFalse(cacheWithColocatedFilterDistributionSafeMetric.value()); + + client.cluster().setBaselineTopology(client.cluster().topologyVersion()); + + assertTrue(cacheWithMdcFilterDistributionSafeMetric.value()); + assertFalse(cacheWithColocatedFilterDistributionSafeMetric.value()); + } + + /** */ + private BooleanMetric findMetricForCache(IgniteEx grid, String cacheName, String metricName) { + GridCacheContext cacheCtx = grid.cachex(cacheName).context(); + + return cacheCtx.kernalContext().metric().registry(cacheMetricsRegistryName( + cacheCtx.name(), cacheCtx.cache().isNear())).findMetric(metricName); } /** */ @@ -201,13 +273,7 @@ private void checkMdcReadyMetric() { IgniteEx ig = grid(i); for (String cacheName : allCaches) { - GridCacheContext cctx = ig.cachex(cacheName).context(); - - MetricRegistryImpl mReg = cctx.kernalContext().metric().registry(cacheMetricsRegistryName(cctx.name(), cctx.cache().isNear())); - - BooleanMetric cacheMdcSafeMetric = mReg.findMetric("IsCacheAffinityMdcReady"); - - System.out.println("-->>-->> [" + System.currentTimeMillis() + "][" + Thread.currentThread().getName() + "] checking cache: " + cacheName); + BooleanMetric cacheMdcSafeMetric = findMetricForCache(ig, cacheName, MDC_SAFE_FILTER_METRIC_NAME); if (ig.localNode().isClient()) { assertNull(cacheMdcSafeMetric); @@ -232,10 +298,11 @@ private IgniteEx startClusterAcrossDataCenters(String[] dcIds, int nodesPerDc) t for (String dcId : dcIds) { System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, dcId); - curNodeDcId = dcId; + for (int i = 0; i < nodesPerDc; i++) { + cellId = STRETCHED_CELL_IDS[i]; - for (int i = 0; i < nodesPerDc; i++) lastNode = startGrid(nodeIdx++); + } } System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); From 2267ca68562ac410011c651518a1f684f54ec5f2 Mon Sep 17 00:00:00 2001 From: Sergey Chugunov Date: Mon, 15 Jun 2026 11:37:06 +0300 Subject: [PATCH 03/11] IGNITE-28692 Move new test to correct module --- .../internal/processors/cache}/MdcCacheMetricsTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) rename modules/{indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric => core/src/test/java/org/apache/ignite/internal/processors/cache}/MdcCacheMetricsTest.java (98%) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/MdcCacheMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java similarity index 98% rename from modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/MdcCacheMetricsTest.java rename to modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java index 8146c54f37ac8..39011966a23ff 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/MdcCacheMetricsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.cache.metric; +package org.apache.ignite.internal.processors.cache; import java.util.HashSet; import java.util.List; @@ -31,8 +31,6 @@ import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.spi.metric.BooleanMetric; From 35b9c4ceafa77699428215a83dd3389c3749fc13 Mon Sep 17 00:00:00 2001 From: Sergey Chugunov Date: Tue, 16 Jun 2026 10:24:19 +0300 Subject: [PATCH 04/11] IGNITE-28692 Code fixes and improvements after self review --- .../processors/cache/CacheMetricsImpl.java | 47 +----- .../processors/cache/GridCacheProcessor.java | 16 +- .../cache/ValidationOnNodeJoinUtils.java | 28 ++++ .../GridDhtPartitionsExchangeFuture.java | 6 +- .../processors/cache/MdcCacheMetricsTest.java | 142 +++++++++++++----- .../testsuites/IgniteCacheTestSuite2.java | 2 + 6 files changed, 161 insertions(+), 80 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index 3356760b250cc..e40ed8bb8805d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -22,13 +22,7 @@ import java.util.function.Supplier; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheMetrics; -import org.apache.ignite.cache.affinity.AffinityFunction; -import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter; -import org.apache.ignite.cache.affinity.rendezvous.MdcAffinityBackupFilter; -import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; @@ -49,7 +43,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiPredicate; import org.jetbrains.annotations.Nullable; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -254,7 +247,7 @@ public class CacheMetricsImpl implements CacheMetrics { private LongAdderMetric rslvrMergedCnt; /** */ - private Boolean mdcReadyAff; + private Boolean affCfgMdcSafe; /** */ private Boolean mdcSafePartDistrib; @@ -1693,8 +1686,8 @@ public void onOffHeapEvict() { } /** */ - private boolean isMdcReadyAffinity() { - return mdcReadyAff != null && mdcReadyAff; + private boolean isAffinityCfgMdcSafe() { + return affCfgMdcSafe != null && affCfgMdcSafe; } /** */ @@ -1769,41 +1762,17 @@ public void registerResolverMetrics() { } /** */ - public void registerMdcMetrics() { - GridKernalContext kCtx = cctx.kernalContext(); - - if (kCtx.clientNode()) - return; - - if (kCtx.discovery().localNode() == null || kCtx.discovery().localNode().dataCenterId() == null) - return; - - registerMdcReadyAffinityMetric(); - - registerPartitionDistributionSafeMetric(); - } - - /** */ - private void registerPartitionDistributionSafeMetric() { + public void registerPartitionDistributionSafeMetric() { mreg.register("IsCachePartitionDistributionSafe", this::isMdcSafePartitionDistribution, "True if current cache partition distribution maintains guarantee 'one partition copy in each datacenter'."); } /** */ - private void registerMdcReadyAffinityMetric() { - mdcReadyAff = Boolean.TRUE; - - mreg.register("IsCacheAffinityMdcReady", this::isMdcReadyAffinity, - "True if cache affinity guarantees having a copy of partition in each data center."); + public void registerAffinityConfigurationSafeMetric(boolean affCfgMdcSafe) { + mreg.register("IsCacheAffinityConfigurationMdcSafe", this::isAffinityCfgMdcSafe, + "True if cache affinity guarantees having a copy of each partition in each data center."); - AffinityFunction affFunc = cctx.config().getAffinity(); - - if (affFunc instanceof RendezvousAffinityFunction) { - IgniteBiPredicate> filter = ((RendezvousAffinityFunction)affFunc).getAffinityBackupFilter(); - - if (!(filter instanceof MdcAffinityBackupFilter) && !(filter instanceof ClusterNodeAttributeColocatedBackupFilter)) - mdcReadyAff = Boolean.FALSE; - } + this.affCfgMdcSafe = affCfgMdcSafe; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 49764040b1be8..8e28e4c93f774 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -217,6 +217,7 @@ import static org.apache.ignite.internal.IgniteComponentType.JTA; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistentCache; +import static org.apache.ignite.internal.processors.cache.ValidationOnNodeJoinUtils.isAffinityConfigurationMdcSafe; import static org.apache.ignite.internal.processors.cache.ValidationOnNodeJoinUtils.validateHashIdResolvers; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition.DFLT_CACHE_REMOVE_ENTRIES_TTL; import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName; @@ -1136,7 +1137,7 @@ private void onKernalStart(GridCacheAdapter cache) throws IgniteCheckedExc cache.onKernalStart(); - cache.metrics0().registerMdcMetrics(); + registerMdcMetrics(cache); if (ctx.events().isRecordable(EventType.EVT_CACHE_STARTED)) ctx.events().addEvent(EventType.EVT_CACHE_STARTED); @@ -1146,6 +1147,19 @@ private void onKernalStart(GridCacheAdapter cache) throws IgniteCheckedExc cache.configuration().getCacheMode() + ']'); } + /** */ + private void registerMdcMetrics(GridCacheAdapter cache) { + if (ctx.clientNode()) + return; + + if (ctx.discovery().localNode() == null || ctx.discovery().localNode().dataCenterId() == null) + return; + + cache.metrics0().registerAffinityConfigurationSafeMetric(isAffinityConfigurationMdcSafe(cache.configuration())); + + cache.metrics0().registerPartitionDistributionSafeMetric(); + } + /** * @param cache Cache to stop. * @param cancel Cancel flag. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java index ac8d97b2a127c..8872e8e337db2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java @@ -35,6 +35,10 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilter; +import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter; +import org.apache.ignite.cache.affinity.rendezvous.MdcAffinityBackupFilter; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cluster.ClusterNode; @@ -56,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.security.SecurityException; import org.apache.ignite.spi.IgniteNodeValidationResult; @@ -70,6 +75,7 @@ import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_AWARE_QUERIES_ENABLED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_SERIALIZABLE_ENABLED; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isDefaultDataRegionPersistent; @@ -631,6 +637,28 @@ private static void checkMemoryConfiguration(ClusterNode rmt, GridKernalContext return null; } + /** */ + static boolean isAffinityConfigurationMdcSafe(CacheConfiguration cc) { + if (cc.getCacheMode() == REPLICATED) + return true; + + AffinityFunction affFunc = cc.getAffinity(); + + if (affFunc instanceof RendezvousAffinityFunction) { + IgniteBiPredicate> filter = ((RendezvousAffinityFunction)affFunc).getAffinityBackupFilter(); + + if (filter instanceof ClusterNodeAttributeAffinityBackupFilter attrFilter) { + if (!F.asList(attrFilter.getAttributeNames()).contains(ATTR_DATA_CENTER_ID)) + return false; + } + + if (!(filter instanceof MdcAffinityBackupFilter) && !(filter instanceof ClusterNodeAttributeColocatedBackupFilter)) + return false; + } + + return true; + } + /** * @param rmtNode Remote node to check. * @param ctx Context. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e869c610eda63..19c286183145b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2469,16 +2469,14 @@ private String exchangeTimingsLogMessage(String header, List timings) { boolean mdcSafeDistribution = true; for (List nodes : assignment.assignment()) { - int dcsCount = (int)nodes.stream().map(ClusterNode::dataCenterId).distinct().count(); + int dcsCnt = (int)nodes.stream().map(ClusterNode::dataCenterId).distinct().count(); - if (dcsCount < numberOfDataCenters) { + if (dcsCnt < numberOfDataCenters) { mdcSafeDistribution = false; break; } } - System.out.println("-->>-->> [" + System.currentTimeMillis() + "][" + Thread.currentThread().getName() + "] " + - "mdcSafeDistribution=" + mdcSafeDistribution + " for cache " + grp.cacheOrGroupName()); boolean finalMdcSafeDistribution = mdcSafeDistribution; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java index 39011966a23ff..79fdcc96c6085 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Set; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilter; import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter; import org.apache.ignite.cache.affinity.rendezvous.MdcAffinityBackupFilter; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; @@ -38,42 +39,71 @@ import org.junit.Test; import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID; import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName; -/** */ +/** + * Test for new cache metrics for highlighting two data safety issues in Multi DataCenter environments: + * 1. If cache configuration doesn't specify an affinity backup filter that could guarantee presence of data copy in each DC. + * 2. If cluster topology changed in such a way that partition copies are not spread across all available DCs. + */ public class MdcCacheMetricsTest extends GridCommonAbstractTest { /** */ private static final int NODES_NUMBER = 5; + /** */ private static final String CACHE_WITH_MDC_FILTER = "mdcSafeCache0"; + /** */ private static final String CACHE_WITH_COLOCATED_FILTER = "mdcSafeCache1"; + + /** */ + private static final String CACHE_WITH_MDC_SAFE_ATTRIBUTE_FILTER = "mdcSafeCache2"; + /** */ - private static final String MDC_UNSAFE_CACHE = "mdcUnsafeCache"; + private static final String MDC_UNSAFE_CACHE = "mdcUnsafeCache0"; + + /** */ + private static final String CACHE_WITH_MDC_UNSAFE_ATTRIBUTE_FILTER = "mdcUnsafeCache1"; + /** */ private static final String STRETCHED_CELL_ATTR_NAME = "DC_CELL_ATTR"; + + /** */ + private static final String ATTR_FOR_UNSAFE_ATTR_FILTER = "MDC_UNAWARE_ATTR" ; + + /** */ + private static final String[] STRETCHED_CELL_IDS = {"CELL_0", "CELL_1"}; + /** */ private static final String DC_ID_0 = "DC_0"; + /** */ private static final String DC_ID_1 = "DC_1"; + /** */ - private static final String[] STRETCHED_CELL_IDS = {"CELL_0", "CELL_1"}; - /** */ - private static final String MDC_SAFE_FILTER_METRIC_NAME = "IsCacheAffinityMdcReady"; + private static final String AFFINITY_CFG_MDC_SAFE_METRIC_NAME = "IsCacheAffinityConfigurationMdcSafe"; + /** */ private static final String PARTITION_DISTRIBUTION_SAFE_METRIC_NAME = "IsCachePartitionDistributionSafe"; + + /** */ + private String dcId; + /** */ private String cellId; + /** */ private boolean useStaticCaches; + /** */ private boolean persistenceEnabled; + /** */ private Set allCaches = new HashSet<>(); + /** */ private Set mdcSafeCaches = new HashSet<>(); - /** */ - private Set mdcUnsafeCaches = new HashSet<>(); /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { @@ -85,7 +115,6 @@ public class MdcCacheMetricsTest extends GridCommonAbstractTest { allCaches.clear(); mdcSafeCaches.clear(); - mdcUnsafeCaches.clear(); } /** {@inheritDoc} */ @@ -95,8 +124,6 @@ public class MdcCacheMetricsTest extends GridCommonAbstractTest { stopAllGrids(); cleanPersistenceDir(); - - System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); } /** {@inheritDoc} */ @@ -110,19 +137,32 @@ public class MdcCacheMetricsTest extends GridCommonAbstractTest { )); if (useStaticCaches) { - CacheConfiguration mdcSafeCacheCfg0 = prepareCacheCfg(CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1)); + CacheConfiguration mdcSafeCacheCfg0 = prepareCacheCfg( + CACHE_WITH_MDC_FILTER, + new MdcAffinityBackupFilter(2, 1), + true); CacheConfiguration mdcSafeCacheCfg1 = prepareCacheCfg( CACHE_WITH_COLOCATED_FILTER, - new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME)); + new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME), + true); - CacheConfiguration mdcUnsafeCacheCfg = prepareCacheCfg(MDC_UNSAFE_CACHE, null); + CacheConfiguration mdcUnsafeCacheCfg0 = prepareCacheCfg(MDC_UNSAFE_CACHE, null, false); - cfg.setCacheConfiguration(mdcSafeCacheCfg0, mdcSafeCacheCfg1, mdcUnsafeCacheCfg); + CacheConfiguration mdcUnsafeCacheCfg1 = prepareCacheCfg( + CACHE_WITH_MDC_UNSAFE_ATTRIBUTE_FILTER, + new ClusterNodeAttributeAffinityBackupFilter(ATTR_FOR_UNSAFE_ATTR_FILTER), + false); + + cfg.setCacheConfiguration(mdcSafeCacheCfg0, mdcSafeCacheCfg1, mdcUnsafeCacheCfg0, mdcUnsafeCacheCfg1); } if (!cfg.isClientMode()) - cfg.setUserAttributes(F.asMap(STRETCHED_CELL_ATTR_NAME, cellId)); + cfg.setUserAttributes(F.asMap( + STRETCHED_CELL_ATTR_NAME, + cellId, + IgniteSystemProperties.IGNITE_DATA_CENTER_ID, + dcId)); return cfg; } @@ -130,20 +170,19 @@ public class MdcCacheMetricsTest extends GridCommonAbstractTest { /** */ private CacheConfiguration prepareCacheCfg( String cacheName, - IgniteBiPredicate> affBackupFilter) - { + IgniteBiPredicate> affBackupFilter, + boolean affCfgMdcSafe) { CacheConfiguration cacheCfg = new CacheConfiguration(cacheName) .setCacheMode(PARTITIONED) .setBackups(1); - if (affBackupFilter != null) { - cacheCfg.setAffinity( - new RendezvousAffinityFunction() - .setPartitions(32) - .setAffinityBackupFilter(affBackupFilter)); + cacheCfg.setAffinity( + new RendezvousAffinityFunction() + .setPartitions(32) + .setAffinityBackupFilter(affBackupFilter)); + if (affCfgMdcSafe) mdcSafeCaches.add(cacheName); - } allCaches.add(cacheName); @@ -152,7 +191,7 @@ private CacheConfiguration prepareCacheCfg( /** */ @Test - public void testMdcAffinityReadyMetricForDynamicCaches() throws Exception { + public void testAffinityCfgMdcSafeMetricForDynamicCaches() throws Exception { useStaticCaches = false; startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); @@ -161,18 +200,25 @@ public void testMdcAffinityReadyMetricForDynamicCaches() throws Exception { client.cluster().state(ClusterState.ACTIVE); - client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1))); + client.getOrCreateCache( + prepareCacheCfg(CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1), true)); + + client.getOrCreateCache( + prepareCacheCfg(CACHE_WITH_COLOCATED_FILTER, new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME), true)); - client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_COLOCATED_FILTER, new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME))); + client.getOrCreateCache( + prepareCacheCfg(MDC_UNSAFE_CACHE, null, false)); - client.getOrCreateCache(prepareCacheCfg(MDC_UNSAFE_CACHE, null)); + client.getOrCreateCache( + prepareCacheCfg(CACHE_WITH_MDC_UNSAFE_ATTRIBUTE_FILTER, + new ClusterNodeAttributeAffinityBackupFilter(ATTR_FOR_UNSAFE_ATTR_FILTER), false)); checkMdcReadyMetric(); } /** */ @Test - public void testMdcAffinityReadyMetricForStaticCaches() throws Exception { + public void testAffinityCfgMdcSafeMetricForStaticCaches() throws Exception { useStaticCaches = true; startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); @@ -193,9 +239,11 @@ public void testPartitionDistributionMetricInMemoryCaches() throws Exception { IgniteEx client = startClientGrid(NODES_NUMBER - 1); - client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1))); + client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1), true)); client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_COLOCATED_FILTER, - new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME))); + new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME), true)); + client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_MDC_SAFE_ATTRIBUTE_FILTER, + new ClusterNodeAttributeAffinityBackupFilter(ATTR_DATA_CENTER_ID), true)); BooleanMetric cacheWithMdcFilterDistributionSafeMetric = findMetricForCache( grid(1), @@ -205,15 +253,23 @@ public void testPartitionDistributionMetricInMemoryCaches() throws Exception { grid(1), CACHE_WITH_COLOCATED_FILTER, PARTITION_DISTRIBUTION_SAFE_METRIC_NAME); + BooleanMetric cacheWithMdcSafeAttrFilterDistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_MDC_SAFE_ATTRIBUTE_FILTER, + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME + ); assertNotNull(cacheWithMdcFilterDistributionSafeMetric); assertNotNull(cacheWithColocatedFilterDistributionSafeMetric); + assertNotNull(cacheWithColocatedFilterDistributionSafeMetric); assertTrue(cacheWithMdcFilterDistributionSafeMetric.value()); assertTrue(cacheWithColocatedFilterDistributionSafeMetric.value()); + assertTrue(cacheWithMdcSafeAttrFilterDistributionSafeMetric.value()); stopGrid(0); assertTrue(cacheWithMdcFilterDistributionSafeMetric.value()); + assertTrue(cacheWithMdcSafeAttrFilterDistributionSafeMetric.value()); assertFalse(cacheWithColocatedFilterDistributionSafeMetric.value()); } @@ -228,9 +284,11 @@ public void testPartitionDistributionMetricPersistentCaches() throws Exception { client.cluster().state(ClusterState.ACTIVE); - client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1))); + client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1), true)); client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_COLOCATED_FILTER, - new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME))); + new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME), true)); + client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_MDC_SAFE_ATTRIBUTE_FILTER, + new ClusterNodeAttributeAffinityBackupFilter(ATTR_DATA_CENTER_ID), true)); BooleanMetric cacheWithMdcFilterDistributionSafeMetric = findMetricForCache( grid(1), @@ -240,21 +298,35 @@ public void testPartitionDistributionMetricPersistentCaches() throws Exception { grid(1), CACHE_WITH_COLOCATED_FILTER, PARTITION_DISTRIBUTION_SAFE_METRIC_NAME); + BooleanMetric cacheWithMdcSafeAttrFilterDistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_MDC_SAFE_ATTRIBUTE_FILTER, + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME + ); assertNotNull(cacheWithMdcFilterDistributionSafeMetric); assertNotNull(cacheWithColocatedFilterDistributionSafeMetric); + assertNotNull(cacheWithMdcSafeAttrFilterDistributionSafeMetric); assertTrue(cacheWithMdcFilterDistributionSafeMetric.value()); assertTrue(cacheWithColocatedFilterDistributionSafeMetric.value()); + assertTrue(cacheWithMdcSafeAttrFilterDistributionSafeMetric.value()); stopGrid(0); assertFalse(cacheWithMdcFilterDistributionSafeMetric.value()); assertFalse(cacheWithColocatedFilterDistributionSafeMetric.value()); + assertFalse(cacheWithMdcSafeAttrFilterDistributionSafeMetric.value()); client.cluster().setBaselineTopology(client.cluster().topologyVersion()); + // MdcAffinityBackupFilter and ClusterNodeAttributeAffinityBackupFilter are able to reassing partitions + // after BaselineTopology change, thus distribution safe metric restores to true. assertTrue(cacheWithMdcFilterDistributionSafeMetric.value()); + assertTrue(cacheWithMdcSafeAttrFilterDistributionSafeMetric.value()); + // But ClusterNodeAttributeColocatedBackupFilter doesn't reassing partitions after BaselineTopology change + // so distribution safe metric for this cache remains false. assertFalse(cacheWithColocatedFilterDistributionSafeMetric.value()); + } /** */ @@ -271,7 +343,7 @@ private void checkMdcReadyMetric() { IgniteEx ig = grid(i); for (String cacheName : allCaches) { - BooleanMetric cacheMdcSafeMetric = findMetricForCache(ig, cacheName, MDC_SAFE_FILTER_METRIC_NAME); + BooleanMetric cacheMdcSafeMetric = findMetricForCache(ig, cacheName, AFFINITY_CFG_MDC_SAFE_METRIC_NAME); if (ig.localNode().isClient()) { assertNull(cacheMdcSafeMetric); @@ -294,7 +366,7 @@ private IgniteEx startClusterAcrossDataCenters(String[] dcIds, int nodesPerDc) t IgniteEx lastNode = null; for (String dcId : dcIds) { - System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, dcId); + this.dcId = dcId; for (int i = 0; i < nodesPerDc; i++) { cellId = STRETCHED_CELL_IDS[i]; @@ -303,8 +375,6 @@ private IgniteEx startClusterAcrossDataCenters(String[] dcIds, int nodesPerDc) t } } - System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); - return lastNode; } } diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index b89ea7a0a4df7..b15c2dcd903f4 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.IgniteNearClientCacheCloseTest; import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitInvokeTest; import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitNearReadersTest; +import org.apache.ignite.internal.processors.cache.MdcCacheMetricsTest; import org.apache.ignite.internal.processors.cache.NoPresentCacheInterceptorOnClientTest; import org.apache.ignite.internal.processors.cache.NonAffinityCoordinatorDynamicStartStopTest; import org.apache.ignite.internal.processors.cache.RebalanceIteratorLargeEntriesOOMTest; @@ -346,6 +347,7 @@ public static List> suite(Collection ignoredTests) { GridTestUtils.addTestIfNeeded(suite, ClusterNodeAttributeAffinityBackupFilterSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, ClusterNodeAttributeColocatedBackupFilterSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, MdcAffinityBackupFilterSelfTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, MdcCacheMetricsTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CachePartitionStateTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CacheComparatorTest.class, ignoredTests); From 2678cca0530e22ddedc51f6100df47d5424a7501 Mon Sep 17 00:00:00 2001 From: Sergey Chugunov Date: Tue, 16 Jun 2026 14:34:04 +0300 Subject: [PATCH 05/11] IGNITE-28692 Checkstyle codechanges --- .../processors/cluster/BaselineTopology.java | 4 ++- .../processors/cache/MdcCacheMetricsTest.java | 30 +++++++++++++++---- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java index ca7fe8e66fe16..d7a26b72399c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java @@ -277,7 +277,9 @@ public Map attributes(Object consId) { } /** - * @return Number of unique datacenters present in the baseline or {@code -1} if unknown. + * Calculates number of datacenters presented in current baseline. + * + * @return Number of datacenters presented in the baseline or {@code -1} if unknown. */ public int numberOfDatacenters() { if (nodeMap.values().iterator().next().get(ATTR_DATA_CENTER_ID) != null) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java index 79fdcc96c6085..efbdfb76448b5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java @@ -70,7 +70,7 @@ public class MdcCacheMetricsTest extends GridCommonAbstractTest { private static final String STRETCHED_CELL_ATTR_NAME = "DC_CELL_ATTR"; /** */ - private static final String ATTR_FOR_UNSAFE_ATTR_FILTER = "MDC_UNAWARE_ATTR" ; + private static final String ATTR_FOR_UNSAFE_ATTR_FILTER = "MDC_UNAWARE_ATTR"; /** */ private static final String[] STRETCHED_CELL_IDS = {"CELL_0", "CELL_1"}; @@ -189,7 +189,11 @@ private CacheConfiguration prepareCacheCfg( return cacheCfg; } - /** */ + /** + * Test verifies correctness of metric for cache configuration related to data distribution across DCs for dynamically started caches. + * Metric should take a {@code false} value if cache configuration doesn't guarantee presence of data copy in each DC + * and {@code true} otherwise. + */ @Test public void testAffinityCfgMdcSafeMetricForDynamicCaches() throws Exception { useStaticCaches = false; @@ -216,7 +220,11 @@ public void testAffinityCfgMdcSafeMetricForDynamicCaches() throws Exception { checkMdcReadyMetric(); } - /** */ + /** + * Test verifies correctness of metric for cache configuration related to data distribution across DCs for statically configured caches. + * Metric should take a {@code false} value if cache configuration doesn't guarantee presence of data copy in each DC + * and {@code true} otherwise. + */ @Test public void testAffinityCfgMdcSafeMetricForStaticCaches() throws Exception { useStaticCaches = true; @@ -230,7 +238,13 @@ public void testAffinityCfgMdcSafeMetricForStaticCaches() throws Exception { checkMdcReadyMetric(); } - /** */ + /** + * Test verifies correctness of metric for partition copies distribution across DCs. + * Metric should take a {@code false} value if there is at least one partition which doesn't have copies in all DCs + * and {@code true} otherwise. + *

      + * This test considers in-memory caches only. + */ @Test public void testPartitionDistributionMetricInMemoryCaches() throws Exception { persistenceEnabled = false; @@ -273,7 +287,13 @@ public void testPartitionDistributionMetricInMemoryCaches() throws Exception { assertFalse(cacheWithColocatedFilterDistributionSafeMetric.value()); } - /** */ + /** + * Test verifies correctness of metric for partition copies distribution across DCs. + * Metric should take a {@code false} value if there is at least one partition which doesn't have copies in all DCs + * and {@code true} otherwise. + *

      + * This test considers persistent caches only and takes into account changes of BaselineTopology. + */ @Test public void testPartitionDistributionMetricPersistentCaches() throws Exception { persistenceEnabled = true; From 5af96bb5ac362303e135e05c690bb5dfa7a10b8e Mon Sep 17 00:00:00 2001 From: Sergey Chugunov Date: Tue, 16 Jun 2026 16:47:18 +0300 Subject: [PATCH 06/11] IGNITE-28692 Improve tests coverage --- .../processors/cache/MdcCacheMetricsTest.java | 123 ++++++++++++++++-- 1 file changed, 113 insertions(+), 10 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java index efbdfb76448b5..7d7f6d1fcdba8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java @@ -100,10 +100,10 @@ public class MdcCacheMetricsTest extends GridCommonAbstractTest { private boolean persistenceEnabled; /** */ - private Set allCaches = new HashSet<>(); + private final Set allCaches = new HashSet<>(); /** */ - private Set mdcSafeCaches = new HashSet<>(); + private final Set mdcSafeCaches = new HashSet<>(); /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { @@ -172,10 +172,22 @@ private CacheConfiguration prepareCacheCfg( String cacheName, IgniteBiPredicate> affBackupFilter, boolean affCfgMdcSafe) { + return prepareCacheCfg(cacheName, affBackupFilter, affCfgMdcSafe, null); + } + + /** */ + private CacheConfiguration prepareCacheCfg( + String cacheName, + IgniteBiPredicate> affBackupFilter, + boolean affCfgMdcSafe, + String cacheGroupName) { CacheConfiguration cacheCfg = new CacheConfiguration(cacheName) .setCacheMode(PARTITIONED) .setBackups(1); + if (cacheGroupName != null) + cacheCfg.setGroupName(cacheGroupName); + cacheCfg.setAffinity( new RendezvousAffinityFunction() .setPartitions(32) @@ -189,15 +201,102 @@ private CacheConfiguration prepareCacheCfg( return cacheCfg; } + /** + * Test verifies correctness of metric for cache configuration related to data distribution across DCs + * if caches are organized into groups. + * + * @throws Exception If failed. + */ + @Test + public void testAffinityCfgMdcSafeMetricForCacheGroup() throws Exception { + startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); + + IgniteEx client = startClientGrid(NODES_NUMBER - 1); + + client.getOrCreateCache( + prepareCacheCfg( + CACHE_WITH_MDC_FILTER + "_0", + new MdcAffinityBackupFilter(2, 1), + true, + "mdcSafeCachesGroup")); + + client.getOrCreateCache( + prepareCacheCfg( + CACHE_WITH_MDC_FILTER + "_1", + new MdcAffinityBackupFilter(2, 1), + true, + "mdcSafeCachesGroup")); + + client.getOrCreateCache( + prepareCacheCfg(MDC_UNSAFE_CACHE + "_0", null, false, "mdcUnsafeCachesGroup")); + + client.getOrCreateCache( + prepareCacheCfg(MDC_UNSAFE_CACHE + "_1", null, false, "mdcUnsafeCachesGroup")); + + checkMdcReadyMetric(); + } + + /** + * Test verifies correctness of metric for partition copies distribution across DCs + * if caches are organized into groups. + * + * @throws Exception If failed. + */ + @Test + public void testPartitionDistributionMetricForCacheGroups() throws Exception { + persistenceEnabled = true; + + startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); + + IgniteEx client = startClientGrid(NODES_NUMBER - 1); + + client.cluster().state(ClusterState.ACTIVE); + + client.getOrCreateCache(prepareCacheCfg( + CACHE_WITH_MDC_FILTER + "_0", + new MdcAffinityBackupFilter(2, 1), + true, + "mdcFilterCacheGroup")); + client.getOrCreateCache(prepareCacheCfg( + CACHE_WITH_MDC_FILTER + "_1", + new MdcAffinityBackupFilter(2, 1), + true, + "mdcFilterCacheGroup")); + + BooleanMetric cache0DistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_MDC_FILTER + "_0", + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME); + BooleanMetric cache1DistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_MDC_FILTER + "_1", + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME); + + assertNotNull(cache0DistributionSafeMetric); + assertNotNull(cache1DistributionSafeMetric); + assertTrue(cache0DistributionSafeMetric.value()); + assertTrue(cache1DistributionSafeMetric.value()); + + stopGrid(0); + + assertFalse(cache0DistributionSafeMetric.value()); + assertFalse(cache1DistributionSafeMetric.value()); + + client.cluster().setBaselineTopology(client.cluster().topologyVersion()); + + assertTrue(cache0DistributionSafeMetric.value()); + assertTrue(cache1DistributionSafeMetric.value()); + } + /** * Test verifies correctness of metric for cache configuration related to data distribution across DCs for dynamically started caches. * Metric should take a {@code false} value if cache configuration doesn't guarantee presence of data copy in each DC * and {@code true} otherwise. + * + * @throws Exception If failed. */ @Test public void testAffinityCfgMdcSafeMetricForDynamicCaches() throws Exception { - useStaticCaches = false; - startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); IgniteEx client = startClientGrid(NODES_NUMBER - 1); @@ -224,6 +323,8 @@ public void testAffinityCfgMdcSafeMetricForDynamicCaches() throws Exception { * Test verifies correctness of metric for cache configuration related to data distribution across DCs for statically configured caches. * Metric should take a {@code false} value if cache configuration doesn't guarantee presence of data copy in each DC * and {@code true} otherwise. + * + * @throws Exception If failed. */ @Test public void testAffinityCfgMdcSafeMetricForStaticCaches() throws Exception { @@ -244,11 +345,11 @@ public void testAffinityCfgMdcSafeMetricForStaticCaches() throws Exception { * and {@code true} otherwise. *

      * This test considers in-memory caches only. + * + * @throws Exception If failed. */ @Test public void testPartitionDistributionMetricInMemoryCaches() throws Exception { - persistenceEnabled = false; - startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); IgniteEx client = startClientGrid(NODES_NUMBER - 1); @@ -293,6 +394,8 @@ public void testPartitionDistributionMetricInMemoryCaches() throws Exception { * and {@code true} otherwise. *

      * This test considers persistent caches only and takes into account changes of BaselineTopology. + * + * @throws Exception If failed. */ @Test public void testPartitionDistributionMetricPersistentCaches() throws Exception { @@ -304,7 +407,8 @@ public void testPartitionDistributionMetricPersistentCaches() throws Exception { client.cluster().state(ClusterState.ACTIVE); - client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1), true)); + client.getOrCreateCache(prepareCacheCfg( + CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1), true)); client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_COLOCATED_FILTER, new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME), true)); client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_MDC_SAFE_ATTRIBUTE_FILTER, @@ -339,14 +443,13 @@ public void testPartitionDistributionMetricPersistentCaches() throws Exception { client.cluster().setBaselineTopology(client.cluster().topologyVersion()); - // MdcAffinityBackupFilter and ClusterNodeAttributeAffinityBackupFilter are able to reassing partitions + // MdcAffinityBackupFilter and ClusterNodeAttributeAffinityBackupFilter are able to reassign partitions // after BaselineTopology change, thus distribution safe metric restores to true. assertTrue(cacheWithMdcFilterDistributionSafeMetric.value()); assertTrue(cacheWithMdcSafeAttrFilterDistributionSafeMetric.value()); - // But ClusterNodeAttributeColocatedBackupFilter doesn't reassing partitions after BaselineTopology change + // But ClusterNodeAttributeColocatedBackupFilter doesn't reassign partitions after BaselineTopology change // so distribution safe metric for this cache remains false. assertFalse(cacheWithColocatedFilterDistributionSafeMetric.value()); - } /** */ From b3bbe1ce8fee7b5f42b38d0eb210db413422716f Mon Sep 17 00:00:00 2001 From: Sergey Chugunov Date: Tue, 16 Jun 2026 17:35:10 +0300 Subject: [PATCH 07/11] IGNITE-28692 Fix test failures --- .../GridDhtPartitionsExchangeFuture.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index ffd19637c6615..4cce9b244a1fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2463,24 +2463,26 @@ private String exchangeTimingsLogMessage(String header, List timings) { grp.topology().onExchangeDone(this, assignment, false); - int numberOfDataCenters = cctx.discovery().discoCache().state().baselineTopology().numberOfDatacenters(); - if (!grp.isReplicated()) { - boolean mdcSafeDistribution = true; + BaselineTopology top = cctx.discovery().discoCache().state().baselineTopology(); + if (top != null) { + int numberOfDataCenters = top.numberOfDatacenters(); + boolean mdcSafeDistribution = true; - for (List nodes : assignment.assignment()) { - int dcsCnt = (int)nodes.stream().map(ClusterNode::dataCenterId).distinct().count(); + for (List nodes : assignment.assignment()) { + int dcsCnt = (int)nodes.stream().map(ClusterNode::dataCenterId).distinct().count(); - if (dcsCnt < numberOfDataCenters) { - mdcSafeDistribution = false; + if (dcsCnt < numberOfDataCenters) { + mdcSafeDistribution = false; - break; + break; + } } - } - boolean finalMdcSafeDistribution = mdcSafeDistribution; + boolean finalMdcSafeDistribution = mdcSafeDistribution; - grp.caches().forEach(cache -> cache.cache().metrics0().setMdcSafePartitionDistribution(finalMdcSafeDistribution)); + grp.caches().forEach(cache -> cache.cache().metrics0().setMdcSafePartitionDistribution(finalMdcSafeDistribution)); + } } } From d0a3f148e7bb9c4eb72cd85ec681af07d601b68f Mon Sep 17 00:00:00 2001 From: Sergey Chugunov Date: Tue, 16 Jun 2026 17:38:32 +0300 Subject: [PATCH 08/11] IGNITE-28692 Fix codestyle --- .../dht/preloader/GridDhtPartitionsExchangeFuture.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 4cce9b244a1fb..5e8dbd7602a29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2481,7 +2481,8 @@ private String exchangeTimingsLogMessage(String header, List timings) { boolean finalMdcSafeDistribution = mdcSafeDistribution; - grp.caches().forEach(cache -> cache.cache().metrics0().setMdcSafePartitionDistribution(finalMdcSafeDistribution)); + grp.caches().forEach( + cache -> cache.cache().metrics0().setMdcSafePartitionDistribution(finalMdcSafeDistribution)); } } } From a7f8a0b41b8550a98628dfe2ba16120ebb0b7b04 Mon Sep 17 00:00:00 2001 From: Sergey Chugunov Date: Wed, 17 Jun 2026 12:48:42 +0300 Subject: [PATCH 09/11] IGNITE-28692 Improve test stability --- .../internal/processors/cache/MdcCacheMetricsTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java index 7d7f6d1fcdba8..7c1870900887e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java @@ -111,8 +111,6 @@ public class MdcCacheMetricsTest extends GridCommonAbstractTest { stopAllGrids(); - System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); - allCaches.clear(); mdcSafeCaches.clear(); } @@ -124,6 +122,9 @@ public class MdcCacheMetricsTest extends GridCommonAbstractTest { stopAllGrids(); cleanPersistenceDir(); + + allCaches.clear(); + mdcSafeCaches.clear(); } /** {@inheritDoc} */ From 0f8514480a37c4e6dae93e56ba928c38cb82c574 Mon Sep 17 00:00:00 2001 From: Sergey Chugunov Date: Wed, 17 Jun 2026 17:50:50 +0300 Subject: [PATCH 10/11] IGNITE-28692 Improve test stability --- .../ignite/internal/processors/cache/MdcCacheMetricsTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java index 7c1870900887e..b5c224712e5bb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java @@ -462,7 +462,9 @@ private BooleanMetric findMetricForCache(IgniteEx grid, String cacheName, String } /** */ - private void checkMdcReadyMetric() { + private void checkMdcReadyMetric() throws InterruptedException { + awaitPartitionMapExchange(); + for (int i = 0; i < NODES_NUMBER; i++) { IgniteEx ig = grid(i); From 853cecc8032bbbd37ec00c93f9ba5ab6e23fd0c2 Mon Sep 17 00:00:00 2001 From: Sergey Chugunov Date: Thu, 18 Jun 2026 12:36:06 +0300 Subject: [PATCH 11/11] IGNITE-28692 Improve comments and code quality --- .../processors/cache/CacheMetricsImpl.java | 17 +++--- .../cache/ValidationOnNodeJoinUtils.java | 7 ++- .../GridDhtPartitionsExchangeFuture.java | 53 +++++++++++-------- .../processors/cluster/BaselineTopology.java | 6 ++- 4 files changed, 51 insertions(+), 32 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index e40ed8bb8805d..527ddb0e1a4ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -1685,12 +1685,17 @@ public void onOffHeapEvict() { return fut != null && !fut.isDone(); } - /** */ + /** + * Returns {@code true} if affinity configuration is aware of multiple data centers and + * it able to spread partitions' copies across data centers. + */ private boolean isAffinityCfgMdcSafe() { return affCfgMdcSafe != null && affCfgMdcSafe; } - /** */ + /** + * Returns {@code true} if current cache partition distribution maintains guarantee 'at least one partition copy in each datacenter'. + */ private boolean isMdcSafePartitionDistribution() { return mdcSafePartDistrib == null || mdcSafePartDistrib; } @@ -1761,13 +1766,13 @@ public void registerResolverMetrics() { "Conflict resolver merged entries count"); } - /** */ + /** Registers metric for partition distribution. */ public void registerPartitionDistributionSafeMetric() { mreg.register("IsCachePartitionDistributionSafe", this::isMdcSafePartitionDistribution, "True if current cache partition distribution maintains guarantee 'one partition copy in each datacenter'."); } - /** */ + /** Registers metric for cache configuration related to distributing partitions across DCs. */ public void registerAffinityConfigurationSafeMetric(boolean affCfgMdcSafe) { mreg.register("IsCacheAffinityConfigurationMdcSafe", this::isAffinityCfgMdcSafe, "True if cache affinity guarantees having a copy of each partition in each data center."); @@ -1775,9 +1780,7 @@ public void registerAffinityConfigurationSafeMetric(boolean affCfgMdcSafe) { this.affCfgMdcSafe = affCfgMdcSafe; } - /** - * - */ + /** Updates current partition distribution safety metric. */ public void setMdcSafePartitionDistribution(boolean safeDistribution) { mdcSafePartDistrib = safeDistribution; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java index 8872e8e337db2..c194e63c52b5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java @@ -637,7 +637,12 @@ private static void checkMemoryConfiguration(ClusterNode rmt, GridKernalContext return null; } - /** */ + /** + * Analyzes affinity settings of a provided {@link CacheConfiguration} to inspect if it provides guarantees + * that partitions of the cache will be spread across all datacenters presented in cluster. + * + * @return {@code true} if affinity settings guarantee spreading partitions across all datacenters and {@code false} otherwise. + */ static boolean isAffinityConfigurationMdcSafe(CacheConfiguration cc) { if (cc.getCacheMode() == REPLICATED) return true; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 5e8dbd7602a29..445cf3275c289 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2463,28 +2463,8 @@ private String exchangeTimingsLogMessage(String header, List timings) { grp.topology().onExchangeDone(this, assignment, false); - if (!grp.isReplicated()) { - BaselineTopology top = cctx.discovery().discoCache().state().baselineTopology(); - if (top != null) { - int numberOfDataCenters = top.numberOfDatacenters(); - boolean mdcSafeDistribution = true; - - for (List nodes : assignment.assignment()) { - int dcsCnt = (int)nodes.stream().map(ClusterNode::dataCenterId).distinct().count(); - - if (dcsCnt < numberOfDataCenters) { - mdcSafeDistribution = false; - - break; - } - } - - boolean finalMdcSafeDistribution = mdcSafeDistribution; - - grp.caches().forEach( - cache -> cache.cache().metrics0().setMdcSafePartitionDistribution(finalMdcSafeDistribution)); - } - } + if (!grp.isReplicated()) + updateMdcMetrics(grp, assignment); } if (changedAffinity()) @@ -2621,6 +2601,35 @@ private void updateDurationHistogram(long duration) { cctx.exchange().blockingDurationHistogram().value(duration); } + /** + * Updates metric for a partition distribution across data centers for a given cache group. + * + * @param grp Cache group the metric should be recalculated for. + * @param assignment New assignment for the cache group. + */ + private void updateMdcMetrics(CacheGroupContext grp, AffinityAssignment assignment) { + BaselineTopology top = cctx.discovery().discoCache().state().baselineTopology(); + if (top != null) { + int numberOfDataCenters = top.numberOfDatacenters(); + boolean mdcSafeDistribution = true; + + for (List nodes : assignment.assignment()) { + int dcsCnt = (int)nodes.stream().map(ClusterNode::dataCenterId).distinct().count(); + + if (dcsCnt < numberOfDataCenters) { + mdcSafeDistribution = false; + + break; + } + } + + boolean finalMdcSafeDistribution = mdcSafeDistribution; + + grp.caches().forEach( + cache -> cache.cache().metrics0().setMdcSafePartitionDistribution(finalMdcSafeDistribution)); + } + } + /** * Calculates discovery lag (Maximal difference between exchange start times across all nodes). * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java index d7a26b72399c3..0a573ff3cc91a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java @@ -282,8 +282,10 @@ public Map attributes(Object consId) { * @return Number of datacenters presented in the baseline or {@code -1} if unknown. */ public int numberOfDatacenters() { - if (nodeMap.values().iterator().next().get(ATTR_DATA_CENTER_ID) != null) - return (int)nodeMap.values().stream().map(m -> m.get(ATTR_DATA_CENTER_ID)).distinct().count(); + Collection> allNodesAttrs = nodeMap.values(); + + if (!allNodesAttrs.isEmpty() && allNodesAttrs.iterator().next().get(ATTR_DATA_CENTER_ID) != null) + return (int)allNodesAttrs.stream().map(m -> m.get(ATTR_DATA_CENTER_ID)).distinct().count(); return -1; }