diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index a67d265dbf79c..527ddb0e1a4ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -246,6 +246,12 @@ public class CacheMetricsImpl implements CacheMetrics { /** Conflict resolver merged entries count. */ private LongAdderMetric rslvrMergedCnt; + /** */ + private Boolean affCfgMdcSafe; + + /** */ + private Boolean mdcSafePartDistrib; + /** * Creates cache metrics. * @@ -1679,6 +1685,21 @@ public void onOffHeapEvict() { return fut != null && !fut.isDone(); } + /** + * Returns {@code true} if affinity configuration is aware of multiple data centers and + * it able to spread partitions' copies across data centers. + */ + private boolean isAffinityCfgMdcSafe() { + return affCfgMdcSafe != null && affCfgMdcSafe; + } + + /** + * Returns {@code true} if current cache partition distribution maintains guarantee 'at least one partition copy in each datacenter'. + */ + private boolean isMdcSafePartitionDistribution() { + return mdcSafePartDistrib == null || mdcSafePartDistrib; + } + /** {@inheritDoc} */ @Override public long getIndexRebuildKeysProcessed() { return idxRebuildKeyProcessed.value(); @@ -1745,6 +1766,25 @@ public void registerResolverMetrics() { "Conflict resolver merged entries count"); } + /** Registers metric for partition distribution. */ + public void registerPartitionDistributionSafeMetric() { + mreg.register("IsCachePartitionDistributionSafe", this::isMdcSafePartitionDistribution, + "True if current cache partition distribution maintains guarantee 'one partition copy in each datacenter'."); + } + + /** Registers metric for cache configuration related to distributing partitions across DCs. */ + public void registerAffinityConfigurationSafeMetric(boolean affCfgMdcSafe) { + mreg.register("IsCacheAffinityConfigurationMdcSafe", this::isAffinityCfgMdcSafe, + "True if cache affinity guarantees having a copy of each partition in each data center."); + + this.affCfgMdcSafe = affCfgMdcSafe; + } + + /** Updates current partition distribution safety metric. */ + public void setMdcSafePartitionDistribution(boolean safeDistribution) { + mdcSafePartDistrib = safeDistribution; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheMetricsImpl.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index a82a8a9179c5a..8e28e4c93f774 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -217,6 +217,7 @@ import static org.apache.ignite.internal.IgniteComponentType.JTA; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistentCache; +import static org.apache.ignite.internal.processors.cache.ValidationOnNodeJoinUtils.isAffinityConfigurationMdcSafe; import static org.apache.ignite.internal.processors.cache.ValidationOnNodeJoinUtils.validateHashIdResolvers; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition.DFLT_CACHE_REMOVE_ENTRIES_TTL; import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName; @@ -1136,6 +1137,8 @@ private void onKernalStart(GridCacheAdapter cache) throws IgniteCheckedExc cache.onKernalStart(); + registerMdcMetrics(cache); + if (ctx.events().isRecordable(EventType.EVT_CACHE_STARTED)) ctx.events().addEvent(EventType.EVT_CACHE_STARTED); @@ -1144,6 +1147,19 @@ private void onKernalStart(GridCacheAdapter cache) throws IgniteCheckedExc cache.configuration().getCacheMode() + ']'); } + /** */ + private void registerMdcMetrics(GridCacheAdapter cache) { + if (ctx.clientNode()) + return; + + if (ctx.discovery().localNode() == null || ctx.discovery().localNode().dataCenterId() == null) + return; + + cache.metrics0().registerAffinityConfigurationSafeMetric(isAffinityConfigurationMdcSafe(cache.configuration())); + + cache.metrics0().registerPartitionDistributionSafeMetric(); + } + /** * @param cache Cache to stop. * @param cancel Cancel flag. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java index ac8d97b2a127c..c194e63c52b5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java @@ -35,6 +35,10 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilter; +import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter; +import org.apache.ignite.cache.affinity.rendezvous.MdcAffinityBackupFilter; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cluster.ClusterNode; @@ -56,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.security.SecurityException; import org.apache.ignite.spi.IgniteNodeValidationResult; @@ -70,6 +75,7 @@ import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_AWARE_QUERIES_ENABLED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_SERIALIZABLE_ENABLED; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isDefaultDataRegionPersistent; @@ -631,6 +637,33 @@ private static void checkMemoryConfiguration(ClusterNode rmt, GridKernalContext return null; } + /** + * Analyzes affinity settings of a provided {@link CacheConfiguration} to inspect if it provides guarantees + * that partitions of the cache will be spread across all datacenters presented in cluster. + * + * @return {@code true} if affinity settings guarantee spreading partitions across all datacenters and {@code false} otherwise. + */ + static boolean isAffinityConfigurationMdcSafe(CacheConfiguration cc) { + if (cc.getCacheMode() == REPLICATED) + return true; + + AffinityFunction affFunc = cc.getAffinity(); + + if (affFunc instanceof RendezvousAffinityFunction) { + IgniteBiPredicate> filter = ((RendezvousAffinityFunction)affFunc).getAffinityBackupFilter(); + + if (filter instanceof ClusterNodeAttributeAffinityBackupFilter attrFilter) { + if (!F.asList(attrFilter.getAttributeNames()).contains(ATTR_DATA_CENTER_ID)) + return false; + } + + if (!(filter instanceof MdcAffinityBackupFilter) && !(filter instanceof ClusterNodeAttributeColocatedBackupFilter)) + return false; + } + + return true; + } + /** * @param rmtNode Remote node to check. * @param ctx Context. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 5c0dc84875a7f..445cf3275c289 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -69,6 +69,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; @@ -2457,8 +2458,14 @@ private String exchangeTimingsLogMessage(String header, List timings) { cleanIdxRebuildFutures = false; - for (CacheGroupContext grp : cctx.cache().cacheGroups()) - grp.topology().onExchangeDone(this, grp.affinity().readyAffinity(res), false); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + AffinityAssignment assignment = grp.affinity().readyAffinity(res); + + grp.topology().onExchangeDone(this, assignment, false); + + if (!grp.isReplicated()) + updateMdcMetrics(grp, assignment); + } if (changedAffinity()) cctx.walState().disableGroupDurabilityForPreloading(this); @@ -2594,6 +2601,35 @@ private void updateDurationHistogram(long duration) { cctx.exchange().blockingDurationHistogram().value(duration); } + /** + * Updates metric for a partition distribution across data centers for a given cache group. + * + * @param grp Cache group the metric should be recalculated for. + * @param assignment New assignment for the cache group. + */ + private void updateMdcMetrics(CacheGroupContext grp, AffinityAssignment assignment) { + BaselineTopology top = cctx.discovery().discoCache().state().baselineTopology(); + if (top != null) { + int numberOfDataCenters = top.numberOfDatacenters(); + boolean mdcSafeDistribution = true; + + for (List nodes : assignment.assignment()) { + int dcsCnt = (int)nodes.stream().map(ClusterNode::dataCenterId).distinct().count(); + + if (dcsCnt < numberOfDataCenters) { + mdcSafeDistribution = false; + + break; + } + } + + boolean finalMdcSafeDistribution = mdcSafeDistribution; + + grp.caches().forEach( + cache -> cache.cache().metrics0().setMdcSafePartitionDistribution(finalMdcSafeDistribution)); + } + } + /** * Calculates discovery lag (Maximal difference between exchange start times across all nodes). * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java index 1f71cb192a2f5..0a573ff3cc91a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java @@ -39,10 +39,12 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID; + /** * BaselineTopology represents a set of "database nodes" - nodes responsible for holding persistent-enabled caches * and persisting their information to durable storage. - * + *

* Two major features BaselineTopology allows are: *

    *
  1. Protection from conflicting updates.
  2. @@ -61,10 +63,10 @@ * [A,B] [C] * | | * (2)updates to both parts - * + *

    * After independent updates applied to both parts of cluster at point(2) node C should not be allowed to join * [A,B] part. - * + *

    * The following algorithm makes sure node C will never join [A,B] part back: *

      *
    1. @@ -95,10 +97,10 @@ *
    2. * When BaselineTopology is set (e.g. on first activation) or recreated (e.g. with set-baseline command) * its ID on all nodes is incremented by one. - * + *

      * So when cluster receives a join request with BaselineTopology it firstly compares joining node BlT ID with * local BlT ID. - * + *

      * If joining node has a BaselineTopology with ID greater than one in cluster it means that BlT was changed * more times there; therefore new node is not allowed to join the cluster. *

    3. @@ -107,27 +109,27 @@ * Instead current set of online nodes from BaselineTopology is used to update {@link BaselineTopology#branchingPntHash} * property of current BaselineTopology. * Old value of the property is moved to {@link BaselineTopology#branchingHist} list. - * + *

      * If joining node and local BlT IDs are the same then cluster takes branchingPntHash of joining node * and verifies that its local branchingHist contains that hash. - * + *

      * If joining node hash is not presented in cluster branching history list * it means that joining node was activated independently of currently running cluster; * therefore new node is not allowed to join the cluster. - * + *

      * If joining node hash is presented in the history, that it is safe to let the node join the cluster. * *

    4. * When BaselineTopology is recreated (e.g. with set-baseline command) previous BaselineTopology is moved * to BaselineHistory (consult source code of {@link GridClusterStateProcessor} for more details). - * + *

      * If cluster sees that joining node BlT ID is less than cluster BlT ID it looks up for BaselineHistory item * for new node ID. * Having this BaselineHistory item cluster verifies that branching history of the item contains * branching point hash of joining node * (similar check as in the case above with only difference that joining node BlT is compared against * BaselineHistory item instead of BaselineTopology). - * + *

      * If new node branching point hash is found in the history than node is allowed to join; * otherwise it is rejected. *

    5. @@ -172,7 +174,7 @@ public class BaselineTopology implements Serializable { private final List branchingHist; /** - * @param nodeMap Map of node consistent ID to it's attributes. + * @param nodeMap Map of node consistent ID to its attributes. */ private BaselineTopology(Map> nodeMap, int id) { this.id = id; @@ -274,6 +276,20 @@ public Map attributes(Object consId) { return nodeMap.get(consId); } + /** + * Calculates number of datacenters presented in current baseline. + * + * @return Number of datacenters presented in the baseline or {@code -1} if unknown. + */ + public int numberOfDatacenters() { + Collection> allNodesAttrs = nodeMap.values(); + + if (!allNodesAttrs.isEmpty() && allNodesAttrs.iterator().next().get(ATTR_DATA_CENTER_ID) != null) + return (int)allNodesAttrs.stream().map(m -> m.get(ATTR_DATA_CENTER_ID)).distinct().count(); + + return -1; + } + /** * */ diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java index a45f3ccd90124..97c1949458400 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java @@ -314,6 +314,8 @@ private IgniteEx startClusterAcrossDataCenters(String[] dcIds, int nodesPerDc) t lastNode = startGrid(nodeIdx++); } + System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); + return lastNode; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java new file mode 100644 index 0000000000000..b5c224712e5bb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MdcCacheMetricsTest.java @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilter; +import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter; +import org.apache.ignite.cache.affinity.rendezvous.MdcAffinityBackupFilter; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.metric.BooleanMetric; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID; +import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName; + +/** + * Test for new cache metrics for highlighting two data safety issues in Multi DataCenter environments: + * 1. If cache configuration doesn't specify an affinity backup filter that could guarantee presence of data copy in each DC. + * 2. If cluster topology changed in such a way that partition copies are not spread across all available DCs. + */ +public class MdcCacheMetricsTest extends GridCommonAbstractTest { + /** */ + private static final int NODES_NUMBER = 5; + + /** */ + private static final String CACHE_WITH_MDC_FILTER = "mdcSafeCache0"; + + /** */ + private static final String CACHE_WITH_COLOCATED_FILTER = "mdcSafeCache1"; + + /** */ + private static final String CACHE_WITH_MDC_SAFE_ATTRIBUTE_FILTER = "mdcSafeCache2"; + + /** */ + private static final String MDC_UNSAFE_CACHE = "mdcUnsafeCache0"; + + /** */ + private static final String CACHE_WITH_MDC_UNSAFE_ATTRIBUTE_FILTER = "mdcUnsafeCache1"; + + /** */ + private static final String STRETCHED_CELL_ATTR_NAME = "DC_CELL_ATTR"; + + /** */ + private static final String ATTR_FOR_UNSAFE_ATTR_FILTER = "MDC_UNAWARE_ATTR"; + + /** */ + private static final String[] STRETCHED_CELL_IDS = {"CELL_0", "CELL_1"}; + + /** */ + private static final String DC_ID_0 = "DC_0"; + + /** */ + private static final String DC_ID_1 = "DC_1"; + + /** */ + private static final String AFFINITY_CFG_MDC_SAFE_METRIC_NAME = "IsCacheAffinityConfigurationMdcSafe"; + + /** */ + private static final String PARTITION_DISTRIBUTION_SAFE_METRIC_NAME = "IsCachePartitionDistributionSafe"; + + /** */ + private String dcId; + + /** */ + private String cellId; + + /** */ + private boolean useStaticCaches; + + /** */ + private boolean persistenceEnabled; + + /** */ + private final Set allCaches = new HashSet<>(); + + /** */ + private final Set mdcSafeCaches = new HashSet<>(); + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + allCaches.clear(); + mdcSafeCaches.clear(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + + allCaches.clear(); + mdcSafeCaches.clear(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(persistenceEnabled) + .setMaxSize(32 * 1024 * 1024) + )); + + if (useStaticCaches) { + CacheConfiguration mdcSafeCacheCfg0 = prepareCacheCfg( + CACHE_WITH_MDC_FILTER, + new MdcAffinityBackupFilter(2, 1), + true); + + CacheConfiguration mdcSafeCacheCfg1 = prepareCacheCfg( + CACHE_WITH_COLOCATED_FILTER, + new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME), + true); + + CacheConfiguration mdcUnsafeCacheCfg0 = prepareCacheCfg(MDC_UNSAFE_CACHE, null, false); + + CacheConfiguration mdcUnsafeCacheCfg1 = prepareCacheCfg( + CACHE_WITH_MDC_UNSAFE_ATTRIBUTE_FILTER, + new ClusterNodeAttributeAffinityBackupFilter(ATTR_FOR_UNSAFE_ATTR_FILTER), + false); + + cfg.setCacheConfiguration(mdcSafeCacheCfg0, mdcSafeCacheCfg1, mdcUnsafeCacheCfg0, mdcUnsafeCacheCfg1); + } + + if (!cfg.isClientMode()) + cfg.setUserAttributes(F.asMap( + STRETCHED_CELL_ATTR_NAME, + cellId, + IgniteSystemProperties.IGNITE_DATA_CENTER_ID, + dcId)); + + return cfg; + } + + /** */ + private CacheConfiguration prepareCacheCfg( + String cacheName, + IgniteBiPredicate> affBackupFilter, + boolean affCfgMdcSafe) { + return prepareCacheCfg(cacheName, affBackupFilter, affCfgMdcSafe, null); + } + + /** */ + private CacheConfiguration prepareCacheCfg( + String cacheName, + IgniteBiPredicate> affBackupFilter, + boolean affCfgMdcSafe, + String cacheGroupName) { + CacheConfiguration cacheCfg = new CacheConfiguration(cacheName) + .setCacheMode(PARTITIONED) + .setBackups(1); + + if (cacheGroupName != null) + cacheCfg.setGroupName(cacheGroupName); + + cacheCfg.setAffinity( + new RendezvousAffinityFunction() + .setPartitions(32) + .setAffinityBackupFilter(affBackupFilter)); + + if (affCfgMdcSafe) + mdcSafeCaches.add(cacheName); + + allCaches.add(cacheName); + + return cacheCfg; + } + + /** + * Test verifies correctness of metric for cache configuration related to data distribution across DCs + * if caches are organized into groups. + * + * @throws Exception If failed. + */ + @Test + public void testAffinityCfgMdcSafeMetricForCacheGroup() throws Exception { + startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); + + IgniteEx client = startClientGrid(NODES_NUMBER - 1); + + client.getOrCreateCache( + prepareCacheCfg( + CACHE_WITH_MDC_FILTER + "_0", + new MdcAffinityBackupFilter(2, 1), + true, + "mdcSafeCachesGroup")); + + client.getOrCreateCache( + prepareCacheCfg( + CACHE_WITH_MDC_FILTER + "_1", + new MdcAffinityBackupFilter(2, 1), + true, + "mdcSafeCachesGroup")); + + client.getOrCreateCache( + prepareCacheCfg(MDC_UNSAFE_CACHE + "_0", null, false, "mdcUnsafeCachesGroup")); + + client.getOrCreateCache( + prepareCacheCfg(MDC_UNSAFE_CACHE + "_1", null, false, "mdcUnsafeCachesGroup")); + + checkMdcReadyMetric(); + } + + /** + * Test verifies correctness of metric for partition copies distribution across DCs + * if caches are organized into groups. + * + * @throws Exception If failed. + */ + @Test + public void testPartitionDistributionMetricForCacheGroups() throws Exception { + persistenceEnabled = true; + + startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); + + IgniteEx client = startClientGrid(NODES_NUMBER - 1); + + client.cluster().state(ClusterState.ACTIVE); + + client.getOrCreateCache(prepareCacheCfg( + CACHE_WITH_MDC_FILTER + "_0", + new MdcAffinityBackupFilter(2, 1), + true, + "mdcFilterCacheGroup")); + client.getOrCreateCache(prepareCacheCfg( + CACHE_WITH_MDC_FILTER + "_1", + new MdcAffinityBackupFilter(2, 1), + true, + "mdcFilterCacheGroup")); + + BooleanMetric cache0DistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_MDC_FILTER + "_0", + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME); + BooleanMetric cache1DistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_MDC_FILTER + "_1", + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME); + + assertNotNull(cache0DistributionSafeMetric); + assertNotNull(cache1DistributionSafeMetric); + assertTrue(cache0DistributionSafeMetric.value()); + assertTrue(cache1DistributionSafeMetric.value()); + + stopGrid(0); + + assertFalse(cache0DistributionSafeMetric.value()); + assertFalse(cache1DistributionSafeMetric.value()); + + client.cluster().setBaselineTopology(client.cluster().topologyVersion()); + + assertTrue(cache0DistributionSafeMetric.value()); + assertTrue(cache1DistributionSafeMetric.value()); + } + + /** + * Test verifies correctness of metric for cache configuration related to data distribution across DCs for dynamically started caches. + * Metric should take a {@code false} value if cache configuration doesn't guarantee presence of data copy in each DC + * and {@code true} otherwise. + * + * @throws Exception If failed. + */ + @Test + public void testAffinityCfgMdcSafeMetricForDynamicCaches() throws Exception { + startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); + + IgniteEx client = startClientGrid(NODES_NUMBER - 1); + + client.cluster().state(ClusterState.ACTIVE); + + client.getOrCreateCache( + prepareCacheCfg(CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1), true)); + + client.getOrCreateCache( + prepareCacheCfg(CACHE_WITH_COLOCATED_FILTER, new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME), true)); + + client.getOrCreateCache( + prepareCacheCfg(MDC_UNSAFE_CACHE, null, false)); + + client.getOrCreateCache( + prepareCacheCfg(CACHE_WITH_MDC_UNSAFE_ATTRIBUTE_FILTER, + new ClusterNodeAttributeAffinityBackupFilter(ATTR_FOR_UNSAFE_ATTR_FILTER), false)); + + checkMdcReadyMetric(); + } + + /** + * Test verifies correctness of metric for cache configuration related to data distribution across DCs for statically configured caches. + * Metric should take a {@code false} value if cache configuration doesn't guarantee presence of data copy in each DC + * and {@code true} otherwise. + * + * @throws Exception If failed. + */ + @Test + public void testAffinityCfgMdcSafeMetricForStaticCaches() throws Exception { + useStaticCaches = true; + + startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); + + IgniteEx client = startClientGrid(NODES_NUMBER - 1); + + client.cluster().state(ClusterState.ACTIVE); + + checkMdcReadyMetric(); + } + + /** + * Test verifies correctness of metric for partition copies distribution across DCs. + * Metric should take a {@code false} value if there is at least one partition which doesn't have copies in all DCs + * and {@code true} otherwise. + *

      + * This test considers in-memory caches only. + * + * @throws Exception If failed. + */ + @Test + public void testPartitionDistributionMetricInMemoryCaches() throws Exception { + startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); + + IgniteEx client = startClientGrid(NODES_NUMBER - 1); + + client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1), true)); + client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_COLOCATED_FILTER, + new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME), true)); + client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_MDC_SAFE_ATTRIBUTE_FILTER, + new ClusterNodeAttributeAffinityBackupFilter(ATTR_DATA_CENTER_ID), true)); + + BooleanMetric cacheWithMdcFilterDistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_MDC_FILTER, + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME); + BooleanMetric cacheWithColocatedFilterDistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_COLOCATED_FILTER, + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME); + BooleanMetric cacheWithMdcSafeAttrFilterDistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_MDC_SAFE_ATTRIBUTE_FILTER, + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME + ); + + assertNotNull(cacheWithMdcFilterDistributionSafeMetric); + assertNotNull(cacheWithColocatedFilterDistributionSafeMetric); + assertNotNull(cacheWithColocatedFilterDistributionSafeMetric); + assertTrue(cacheWithMdcFilterDistributionSafeMetric.value()); + assertTrue(cacheWithColocatedFilterDistributionSafeMetric.value()); + assertTrue(cacheWithMdcSafeAttrFilterDistributionSafeMetric.value()); + + stopGrid(0); + + assertTrue(cacheWithMdcFilterDistributionSafeMetric.value()); + assertTrue(cacheWithMdcSafeAttrFilterDistributionSafeMetric.value()); + assertFalse(cacheWithColocatedFilterDistributionSafeMetric.value()); + } + + /** + * Test verifies correctness of metric for partition copies distribution across DCs. + * Metric should take a {@code false} value if there is at least one partition which doesn't have copies in all DCs + * and {@code true} otherwise. + *

      + * This test considers persistent caches only and takes into account changes of BaselineTopology. + * + * @throws Exception If failed. + */ + @Test + public void testPartitionDistributionMetricPersistentCaches() throws Exception { + persistenceEnabled = true; + + startClusterAcrossDataCenters(new String[] {DC_ID_0, DC_ID_1}, 2); + + IgniteEx client = startClientGrid(NODES_NUMBER - 1); + + client.cluster().state(ClusterState.ACTIVE); + + client.getOrCreateCache(prepareCacheCfg( + CACHE_WITH_MDC_FILTER, new MdcAffinityBackupFilter(2, 1), true)); + client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_COLOCATED_FILTER, + new ClusterNodeAttributeColocatedBackupFilter(STRETCHED_CELL_ATTR_NAME), true)); + client.getOrCreateCache(prepareCacheCfg(CACHE_WITH_MDC_SAFE_ATTRIBUTE_FILTER, + new ClusterNodeAttributeAffinityBackupFilter(ATTR_DATA_CENTER_ID), true)); + + BooleanMetric cacheWithMdcFilterDistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_MDC_FILTER, + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME); + BooleanMetric cacheWithColocatedFilterDistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_COLOCATED_FILTER, + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME); + BooleanMetric cacheWithMdcSafeAttrFilterDistributionSafeMetric = findMetricForCache( + grid(1), + CACHE_WITH_MDC_SAFE_ATTRIBUTE_FILTER, + PARTITION_DISTRIBUTION_SAFE_METRIC_NAME + ); + + assertNotNull(cacheWithMdcFilterDistributionSafeMetric); + assertNotNull(cacheWithColocatedFilterDistributionSafeMetric); + assertNotNull(cacheWithMdcSafeAttrFilterDistributionSafeMetric); + assertTrue(cacheWithMdcFilterDistributionSafeMetric.value()); + assertTrue(cacheWithColocatedFilterDistributionSafeMetric.value()); + assertTrue(cacheWithMdcSafeAttrFilterDistributionSafeMetric.value()); + + stopGrid(0); + + assertFalse(cacheWithMdcFilterDistributionSafeMetric.value()); + assertFalse(cacheWithColocatedFilterDistributionSafeMetric.value()); + assertFalse(cacheWithMdcSafeAttrFilterDistributionSafeMetric.value()); + + client.cluster().setBaselineTopology(client.cluster().topologyVersion()); + + // MdcAffinityBackupFilter and ClusterNodeAttributeAffinityBackupFilter are able to reassign partitions + // after BaselineTopology change, thus distribution safe metric restores to true. + assertTrue(cacheWithMdcFilterDistributionSafeMetric.value()); + assertTrue(cacheWithMdcSafeAttrFilterDistributionSafeMetric.value()); + // But ClusterNodeAttributeColocatedBackupFilter doesn't reassign partitions after BaselineTopology change + // so distribution safe metric for this cache remains false. + assertFalse(cacheWithColocatedFilterDistributionSafeMetric.value()); + } + + /** */ + private BooleanMetric findMetricForCache(IgniteEx grid, String cacheName, String metricName) { + GridCacheContext cacheCtx = grid.cachex(cacheName).context(); + + return cacheCtx.kernalContext().metric().registry(cacheMetricsRegistryName( + cacheCtx.name(), cacheCtx.cache().isNear())).findMetric(metricName); + } + + /** */ + private void checkMdcReadyMetric() throws InterruptedException { + awaitPartitionMapExchange(); + + for (int i = 0; i < NODES_NUMBER; i++) { + IgniteEx ig = grid(i); + + for (String cacheName : allCaches) { + BooleanMetric cacheMdcSafeMetric = findMetricForCache(ig, cacheName, AFFINITY_CFG_MDC_SAFE_METRIC_NAME); + + if (ig.localNode().isClient()) { + assertNull(cacheMdcSafeMetric); + + continue; + } + else + assertNotNull("Grid: " + i + ", cache: " + cacheName, cacheMdcSafeMetric); + + boolean cacheMdcSafe = cacheMdcSafeMetric.value(); + + assertEquals(mdcSafeCaches.contains(cacheName), cacheMdcSafe); + } + } + } + + /** */ + private IgniteEx startClusterAcrossDataCenters(String[] dcIds, int nodesPerDc) throws Exception { + int nodeIdx = 0; + IgniteEx lastNode = null; + + for (String dcId : dcIds) { + this.dcId = dcId; + + for (int i = 0; i < nodesPerDc; i++) { + cellId = STRETCHED_CELL_IDS[i]; + + lastNode = startGrid(nodeIdx++); + } + } + + return lastNode; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index b89ea7a0a4df7..b15c2dcd903f4 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.IgniteNearClientCacheCloseTest; import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitInvokeTest; import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitNearReadersTest; +import org.apache.ignite.internal.processors.cache.MdcCacheMetricsTest; import org.apache.ignite.internal.processors.cache.NoPresentCacheInterceptorOnClientTest; import org.apache.ignite.internal.processors.cache.NonAffinityCoordinatorDynamicStartStopTest; import org.apache.ignite.internal.processors.cache.RebalanceIteratorLargeEntriesOOMTest; @@ -346,6 +347,7 @@ public static List> suite(Collection ignoredTests) { GridTestUtils.addTestIfNeeded(suite, ClusterNodeAttributeAffinityBackupFilterSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, ClusterNodeAttributeColocatedBackupFilterSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, MdcAffinityBackupFilterSelfTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, MdcCacheMetricsTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CachePartitionStateTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CacheComparatorTest.class, ignoredTests);