Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ public class CacheMetricsImpl implements CacheMetrics {
/** Conflict resolver merged entries count. */
private LongAdderMetric rslvrMergedCnt;

/** */
private Boolean affCfgMdcSafe;

/** */
private Boolean mdcSafePartDistrib;

/**
* Creates cache metrics.
*
Expand Down Expand Up @@ -1679,6 +1685,21 @@ public void onOffHeapEvict() {
return fut != null && !fut.isDone();
}

/**
* Returns {@code true} if affinity configuration is aware of multiple data centers and
* it able to spread partitions' copies across data centers.
*/
private boolean isAffinityCfgMdcSafe() {
return affCfgMdcSafe != null && affCfgMdcSafe;
}

/**
* Returns {@code true} if current cache partition distribution maintains guarantee 'at least one partition copy in each datacenter'.
*/
private boolean isMdcSafePartitionDistribution() {
return mdcSafePartDistrib == null || mdcSafePartDistrib;
}

/** {@inheritDoc} */
@Override public long getIndexRebuildKeysProcessed() {
return idxRebuildKeyProcessed.value();
Expand Down Expand Up @@ -1745,6 +1766,25 @@ public void registerResolverMetrics() {
"Conflict resolver merged entries count");
}

/** Registers metric for partition distribution. */
public void registerPartitionDistributionSafeMetric() {
mreg.register("IsCachePartitionDistributionSafe", this::isMdcSafePartitionDistribution,
"True if current cache partition distribution maintains guarantee 'one partition copy in each datacenter'.");
}

/** Registers metric for cache configuration related to distributing partitions across DCs. */
public void registerAffinityConfigurationSafeMetric(boolean affCfgMdcSafe) {
mreg.register("IsCacheAffinityConfigurationMdcSafe", this::isAffinityCfgMdcSafe,
"True if cache affinity guarantees having a copy of each partition in each data center.");

this.affCfgMdcSafe = affCfgMdcSafe;
}

/** Updates current partition distribution safety metric. */
public void setMdcSafePartitionDistribution(boolean safeDistribution) {
mdcSafePartDistrib = safeDistribution;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheMetricsImpl.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@
import static org.apache.ignite.internal.IgniteComponentType.JTA;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistentCache;
import static org.apache.ignite.internal.processors.cache.ValidationOnNodeJoinUtils.isAffinityConfigurationMdcSafe;
import static org.apache.ignite.internal.processors.cache.ValidationOnNodeJoinUtils.validateHashIdResolvers;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition.DFLT_CACHE_REMOVE_ENTRIES_TTL;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
Expand Down Expand Up @@ -1136,6 +1137,8 @@ private void onKernalStart(GridCacheAdapter<?, ?> cache) throws IgniteCheckedExc

cache.onKernalStart();

registerMdcMetrics(cache);

if (ctx.events().isRecordable(EventType.EVT_CACHE_STARTED))
ctx.events().addEvent(EventType.EVT_CACHE_STARTED);

Expand All @@ -1144,6 +1147,19 @@ private void onKernalStart(GridCacheAdapter<?, ?> cache) throws IgniteCheckedExc
cache.configuration().getCacheMode() + ']');
}

/** */
private void registerMdcMetrics(GridCacheAdapter<?, ?> cache) {
if (ctx.clientNode())
return;

if (ctx.discovery().localNode() == null || ctx.discovery().localNode().dataCenterId() == null)
return;

cache.metrics0().registerAffinityConfigurationSafeMetric(isAffinityConfigurationMdcSafe(cache.configuration()));

cache.metrics0().registerPartitionDistributionSafeMetric();
}

/**
* @param cache Cache to stop.
* @param cancel Cancel flag.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilter;
import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter;
import org.apache.ignite.cache.affinity.rendezvous.MdcAffinityBackupFilter;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cluster.ClusterNode;
Expand All @@ -56,6 +60,7 @@
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.security.SecurityException;
import org.apache.ignite.spi.IgniteNodeValidationResult;
Expand All @@ -70,6 +75,7 @@
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_AWARE_QUERIES_ENABLED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_SERIALIZABLE_ENABLED;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isDefaultDataRegionPersistent;
Expand Down Expand Up @@ -631,6 +637,33 @@ private static void checkMemoryConfiguration(ClusterNode rmt, GridKernalContext
return null;
}

/**
* Analyzes affinity settings of a provided {@link CacheConfiguration} to inspect if it provides guarantees
* that partitions of the cache will be spread across all datacenters presented in cluster.
*
* @return {@code true} if affinity settings guarantee spreading partitions across all datacenters and {@code false} otherwise.
*/
static boolean isAffinityConfigurationMdcSafe(CacheConfiguration cc) {
if (cc.getCacheMode() == REPLICATED)
return true;

AffinityFunction affFunc = cc.getAffinity();

if (affFunc instanceof RendezvousAffinityFunction) {
IgniteBiPredicate<ClusterNode, List<ClusterNode>> filter = ((RendezvousAffinityFunction)affFunc).getAffinityBackupFilter();

if (filter instanceof ClusterNodeAttributeAffinityBackupFilter attrFilter) {
if (!F.asList(attrFilter.getAttributeNames()).contains(ATTR_DATA_CENTER_ID))
return false;
}

if (!(filter instanceof MdcAffinityBackupFilter) && !(filter instanceof ClusterNodeAttributeColocatedBackupFilter))
return false;
}

return true;
}

/**
* @param rmtNode Remote node to check.
* @param ctx Context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
Expand Down Expand Up @@ -2457,8 +2458,14 @@ private String exchangeTimingsLogMessage(String header, List<String> timings) {

cleanIdxRebuildFutures = false;

for (CacheGroupContext grp : cctx.cache().cacheGroups())
grp.topology().onExchangeDone(this, grp.affinity().readyAffinity(res), false);
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
AffinityAssignment assignment = grp.affinity().readyAffinity(res);

grp.topology().onExchangeDone(this, assignment, false);

if (!grp.isReplicated())
updateMdcMetrics(grp, assignment);
}

if (changedAffinity())
cctx.walState().disableGroupDurabilityForPreloading(this);
Expand Down Expand Up @@ -2594,6 +2601,35 @@ private void updateDurationHistogram(long duration) {
cctx.exchange().blockingDurationHistogram().value(duration);
}

/**
* Updates metric for a partition distribution across data centers for a given cache group.
*
* @param grp Cache group the metric should be recalculated for.
* @param assignment New assignment for the cache group.
*/
private void updateMdcMetrics(CacheGroupContext grp, AffinityAssignment assignment) {
BaselineTopology top = cctx.discovery().discoCache().state().baselineTopology();
if (top != null) {
int numberOfDataCenters = top.numberOfDatacenters();
boolean mdcSafeDistribution = true;

for (List<ClusterNode> nodes : assignment.assignment()) {
int dcsCnt = (int)nodes.stream().map(ClusterNode::dataCenterId).distinct().count();

if (dcsCnt < numberOfDataCenters) {
mdcSafeDistribution = false;

break;
}
}

boolean finalMdcSafeDistribution = mdcSafeDistribution;

grp.caches().forEach(
cache -> cache.cache().metrics0().setMdcSafePartitionDistribution(finalMdcSafeDistribution));
}
}

/**
* Calculates discovery lag (Maximal difference between exchange start times across all nodes).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;

/**
* BaselineTopology represents a set of "database nodes" - nodes responsible for holding persistent-enabled caches
* and persisting their information to durable storage.
*
* <p/>
* Two major features BaselineTopology allows are:
* <ol>
* <li>Protection from conflicting updates.</li>
Expand All @@ -61,10 +63,10 @@
* [A,B] [C]
* | |
* (2)updates to both parts
*
* <p/>
* After independent updates applied to both parts of cluster at point(2) node C should not be allowed to join
* [A,B] part.
*
* <p/>
* The following algorithm makes sure node C will never join [A,B] part back:
* <ol>
* <li>
Expand Down Expand Up @@ -95,10 +97,10 @@
* <li>
* When BaselineTopology is set (e.g. on first activation) or recreated (e.g. with set-baseline command)
* its ID on all nodes is incremented by one.
*
* <p/>
* So when cluster receives a join request with BaselineTopology it firstly compares joining node BlT ID with
* local BlT ID.
*
* <p/>
* If joining node has a BaselineTopology with ID greater than one in cluster it means that BlT was changed
* more times there; therefore new node is not allowed to join the cluster.
* </li>
Expand All @@ -107,27 +109,27 @@
* Instead current set of online nodes from BaselineTopology is used to update {@link BaselineTopology#branchingPntHash}
* property of current BaselineTopology.
* Old value of the property is moved to {@link BaselineTopology#branchingHist} list.
*
* <p/>
* If joining node and local BlT IDs are the same then cluster takes <b>branchingPntHash</b> of joining node
* and verifies that its local <b>branchingHist</b> contains that hash.
*
* <p/>
* If joining node hash is not presented in cluster branching history list
* it means that joining node was activated independently of currently running cluster;
* therefore new node is not allowed to join the cluster.
*
* <p/>
* If joining node hash is presented in the history, that it is safe to let the node join the cluster.
* </li>
* <li>
* When BaselineTopology is recreated (e.g. with set-baseline command) previous BaselineTopology is moved
* to BaselineHistory (consult source code of {@link GridClusterStateProcessor} for more details).
*
* <p/>
* If cluster sees that joining node BlT ID is less than cluster BlT ID it looks up for BaselineHistory item
* for new node ID.
* Having this BaselineHistory item cluster verifies that branching history of the item contains
* branching point hash of joining node
* (similar check as in the case above with only difference that joining node BlT is compared against
* BaselineHistory item instead of BaselineTopology).
*
* <p/>
* If new node branching point hash is found in the history than node is allowed to join;
* otherwise it is rejected.
* </li>
Expand Down Expand Up @@ -172,7 +174,7 @@ public class BaselineTopology implements Serializable {
private final List<Long> branchingHist;

/**
* @param nodeMap Map of node consistent ID to it's attributes.
* @param nodeMap Map of node consistent ID to its attributes.
*/
private BaselineTopology(Map<Object, Map<String, Object>> nodeMap, int id) {
this.id = id;
Expand Down Expand Up @@ -274,6 +276,20 @@ public Map<String, Object> attributes(Object consId) {
return nodeMap.get(consId);
}

/**
* Calculates number of datacenters presented in current baseline.
*
* @return Number of datacenters presented in the baseline or {@code -1} if unknown.
*/
public int numberOfDatacenters() {
Collection<Map<String, Object>> allNodesAttrs = nodeMap.values();

if (!allNodesAttrs.isEmpty() && allNodesAttrs.iterator().next().get(ATTR_DATA_CENTER_ID) != null)
return (int)allNodesAttrs.stream().map(m -> m.get(ATTR_DATA_CENTER_ID)).distinct().count();

return -1;
}

/**
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ private IgniteEx startClusterAcrossDataCenters(String[] dcIds, int nodesPerDc) t
lastNode = startGrid(nodeIdx++);
}

System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);

return lastNode;
}

Expand Down
Loading
Loading