diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java index 6953d8b853891..4a8f556781cf7 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java @@ -322,7 +322,7 @@ private static class AttributeValueHolder { } /** Allows to change multiple attribute values in a single update operation and skip updates that changes nothing. */ - private static class ContextUpdater { + static class ContextUpdater { /** */ private static final int INIT_UPDATES_CAPACITY = 3; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 9da592635d229..cc0ab3e112b7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -666,6 +666,10 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(PartitionHashRecord.class); withNoSchema(TransactionsHashRecord.class); + // [13400 - 13600]: Operation context messages. + msgIdx = 13400; + withNoSchema(DistributedOperationContextMessage.class); + assert msgIdx <= MAX_MESSAGE_ID; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java new file mode 100644 index 0000000000000..42d5c7eda859a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.internal.thread.context.DistributedOperationContextManager; +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Transport for {@link OperationContext} distributed attributes. + * + * @see DistributedOperationContextManager + */ +public class DistributedOperationContextMessage implements Message { + /** Values of operation context attributes. */ + @Order(0) + public Message[] vals; + + /** Bitmap of effective attributes ids. */ + @Order(1) + public byte idBitmap; + + /** Empty constructor for serialization purposes. */ + public DistributedOperationContextMessage() { + // No-op. + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java new file mode 100644 index 0000000000000..e87f8cd4d0159 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.thread.context; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.DistributedOperationContextMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** + * Provides the ability to manage {@link OperationContext} attributes in a distributed manner. + * + *

This mechanism is primarily used to propagate {@link OperationContext} state across the cluster by + * capturing it before a message is sent, transferring it together with the message, and restoring it on + * the receiving node before message processing begins.

+ * + *

The implementation relies on a mapping between a distributed identifier and an + * {@link OperationContextAttribute} instance that is consistent across all cluster nodes.

+ * + *

To enable propagation of an {@link OperationContextAttribute} value across cluster nodes, the + * attribute must be created using the {@link #createDistributedAttribute(byte, Message)} method. + * + *

Note, that the maximum number of distributed attribute instances that can be created is currently limited to + * {@link #MAX_DISTRIBUTED_ATTR_CNT} for implementation reasons.

+ * + * @see OperationContext + * @see DistributedOperationContextMessage + */ +public class DistributedOperationContextManager { + /** */ + private static final DistributedOperationContextManager INSTANCE = new DistributedOperationContextManager(); + + /** Maximal number of supported distributed attributes. */ + static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE; + + /** Registered distributed attributes by their cluster-wide id. */ + private final Map> attrs = new ConcurrentSkipListMap<>(); + + /** */ + public static DistributedOperationContextManager instance() { + return INSTANCE; + } + + /** + * Creates a new {@link OperationContext} attribute with the specified distributed ID and initial value. + * + *

The distributed ID is used to consistently identify the attribute across all nodes in the cluster. + * It must be unique, and its value must be in the range from {@code 0} (inclusive) to {@code Byte.SIZE} (exclusive).

+ * + *

The value of the created attribute is automatically captured and propagated between cluster nodes + * during message transmission.

+ * + * @see OperationContextAttribute#newInstance(Object) + */ + public OperationContextAttribute createDistributedAttribute(byte id, @Nullable T initVal) { + assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed attributed id [id=" + id + ']'; + + return (OperationContextAttribute)attrs.compute(id, (id0, attr0) -> { + if (attr0 != null) + throw new IgniteException("Duplicated distributed attribute id [id=" + id + ']'); + + return OperationContextAttribute.newInstance(initVal); + }); + } + + /** + * Collects the values of all distributed {@link OperationContextAttribute}s registered by this manager in a format + * suitable for transmission between cluster nodes. + * + * @see OperationContext#get(OperationContextAttribute) + */ + public @Nullable DistributedOperationContextMessage collectDistributedAttributes() { + DistributedOperationContextMessage res = null; + List vals = null; + + for (Map.Entry> e : attrs.entrySet()) { + OperationContextAttribute attr = e.getValue(); + + Message curVal = OperationContext.get(attr); + + if (curVal != attr.initialValue()) { + if (res == null) { + res = new DistributedOperationContextMessage(); + + vals = new ArrayList<>(MAX_DISTRIBUTED_ATTR_CNT / 2); + } + + byte mask = (byte)(1 << e.getKey()); + + assert (res.idBitmap & mask) == 0; + + vals.add(curVal); + res.idBitmap |= mask; + } + } + + if (res != null) + res.vals = vals.toArray(vals.toArray(new Message[vals.size()])); + + return res; + } + + /** Restores distributed {@link OperationContextAttribute} values received from a remote node. */ + public Scope restoreDistributedAttributes(@Nullable DistributedOperationContextMessage msg) { + if (msg == null) + return Scope.NOOP_SCOPE; + + assert msg.idBitmap != 0; + assert !F.isEmpty(msg.vals); + assert msg.vals.length <= MAX_DISTRIBUTED_ATTR_CNT; + + OperationContext.ContextUpdater updater = OperationContext.ContextUpdater.create(); + + for (byte valIdx = 0, maskIdx = 0; valIdx < msg.vals.length; ++valIdx) { + Message curVal = msg.vals[valIdx]; + + while ((msg.idBitmap & (1 << maskIdx)) == 0) + ++maskIdx; + + OperationContextAttribute attr = attrs.get(maskIdx++); + + assert attr != null; + + updater.set(attr, curVal); + } + + return updater.apply(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index a0e1a20048786..ccf622be6ea3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -70,6 +70,8 @@ import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable; +import org.apache.ignite.internal.thread.context.DistributedOperationContextManager; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; @@ -1310,6 +1312,8 @@ private class SocketWriter extends IgniteSpiThread { * @param msg Message. */ private void sendMessage(TcpDiscoveryAbstractMessage msg) { + msg.opCtxMsg = DistributedOperationContextManager.instance().collectDistributedAttributes(); + synchronized (mux) { queue.add(msg); @@ -1757,251 +1761,256 @@ private MessageWorker(IgniteLogger log) { blockingSectionEnd(); } - if (msg instanceof JoinTimeout) { - int joinCnt0 = ((JoinTimeout)msg).joinCnt; + TcpDiscoveryAbstractMessage dm = msg instanceof TcpDiscoveryAbstractMessage + ? (TcpDiscoveryAbstractMessage)msg + : null; - if (joinCnt == joinCnt0) { - if (state == STARTING) { - joinError(new IgniteSpiException("Join process timed out, did not receive response for " + - "join request (consider increasing 'joinTimeout' configuration property) " + - "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']')); + try (Scope ignored = DistributedOperationContextManager.instance() + .restoreDistributedAttributes(dm == null ? null : dm.opCtxMsg)) { + if (msg instanceof JoinTimeout) { + int joinCnt0 = ((JoinTimeout)msg).joinCnt; - break; - } - else if (state == DISCONNECTED) { - if (log.isDebugEnabled()) - log.debug("Failed to reconnect, local node segmented " + - "[joinTimeout=" + spi.joinTimeout + ']'); + if (joinCnt == joinCnt0) { + if (state == STARTING) { + joinError(new IgniteSpiException("Join process timed out, did not receive response for " + + "join request (consider increasing 'joinTimeout' configuration property) " + + "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']')); + + break; + } + else if (state == DISCONNECTED) { + if (log.isDebugEnabled()) + log.debug("Failed to reconnect, local node segmented " + + "[joinTimeout=" + spi.joinTimeout + ']'); - state = SEGMENTED; + state = SEGMENTED; - notifyDiscovery( - EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); + notifyDiscovery( + EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); + } } } - } - else if (msg == SPI_STOP) { - boolean connected = state == CONNECTED; + else if (msg == SPI_STOP) { + boolean connected = state == CONNECTED; - state = STOPPED; + state = STOPPED; - assert spi.getSpiContext().isStopping(); + assert spi.getSpiContext().isStopping(); - if (connected && currSock != null) { - TcpDiscoveryNodeLeftMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); + if (connected && currSock != null) { + TcpDiscoveryNodeLeftMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); - leftMsg.client(true); + leftMsg.client(true); - Span rootSpan = tracing.create(TraceableMessagesTable.traceName(leftMsg.getClass())) - .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> locNode.id().toString()) - .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID), - () -> locNode.consistentId().toString()) - .addLog(() -> "Created"); + Span rootSpan = tracing.create(TraceableMessagesTable.traceName(leftMsg.getClass())) + .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> locNode.id().toString()) + .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID), + () -> locNode.consistentId().toString()) + .addLog(() -> "Created"); - leftMsg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); + leftMsg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); - sockWriter.sendMessage(leftMsg); + sockWriter.sendMessage(leftMsg); - rootSpan.addLog(() -> "Sent").end(); + rootSpan.addLog(() -> "Sent").end(); + } + else + leaveLatch.countDown(); } - else - leaveLatch.countDown(); - } - else if (msg == SPI_RECONNECT) { - if (state == CONNECTED) { - if (reconnector != null) { - reconnector.cancel(); - reconnector.join(); + else if (msg == SPI_RECONNECT) { + if (state == CONNECTED) { + if (reconnector != null) { + reconnector.cancel(); + reconnector.join(); - reconnector = null; - } + reconnector = null; + } - sockWriter.forceLeave(); - sockReader.forceStopRead(); + sockWriter.forceLeave(); + sockReader.forceStopRead(); - currSock = null; + currSock = null; - queue.clear(); + queue.clear(); - onDisconnected(); + onDisconnected(); - UUID newId = UUID.randomUUID(); + UUID newId = UUID.randomUUID(); - U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " + - "to network problems [newId=" + newId + - ", prevId=" + locNode.id() + - ", locNode=" + locNode + ']'); + U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " + + "to network problems [newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode + ']'); - locNode.onClientDisconnected(newId); + locNode.onClientDisconnected(newId); - throttleClientReconnect(); + throttleClientReconnect(); - tryJoin(); + tryJoin(); + } } - } - else if (msg instanceof TcpDiscoveryNodeFailedMessage && - ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) { - TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg; + else if (msg instanceof TcpDiscoveryNodeFailedMessage && + ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) { + TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg; - assert msg0.force() : msg0; + assert msg0.force() : msg0; - forceFailMsg = msg0; - } - else if (msg instanceof SocketClosedMessage) { - if (((SocketClosedMessage)msg).sock == currSock) { - Socket sock = currSock.sock; + forceFailMsg = msg0; + } + else if (msg instanceof SocketClosedMessage) { + if (((SocketClosedMessage)msg).sock == currSock) { + Socket sock = currSock.sock; - InetSocketAddress prevAddr = new InetSocketAddress(sock.getInetAddress(), sock.getPort()); + InetSocketAddress prevAddr = new InetSocketAddress(sock.getInetAddress(), sock.getPort()); - currSock = null; + currSock = null; - boolean join = joinLatch.getCount() > 0; + boolean join = joinLatch.getCount() > 0; - if (spi.getSpiContext().isStopping() || state == SEGMENTED) { - leaveLatch.countDown(); + if (spi.getSpiContext().isStopping() || state == SEGMENTED) { + leaveLatch.countDown(); - if (join) { - joinError(new IgniteSpiException("Failed to connect to cluster: socket closed.")); + if (join) { + joinError(new IgniteSpiException("Failed to connect to cluster: socket closed.")); - break; - } - } - else { - if (forceFailMsg != null) { - if (log.isDebugEnabled()) { - log.debug("Connection closed, local node received force fail message, " + - "will not try to restore connection"); + break; } - - queue.addFirst(SPI_RECONNECT_FAILED); } else { - if (log.isDebugEnabled()) - log.debug("Connection closed, will try to restore connection."); + if (forceFailMsg != null) { + if (log.isDebugEnabled()) { + log.debug("Connection closed, local node received force fail message, " + + "will not try to restore connection"); + } - assert reconnector == null; + queue.addFirst(SPI_RECONNECT_FAILED); + } + else { + if (log.isDebugEnabled()) + log.debug("Connection closed, will try to restore connection."); + + assert reconnector == null; - reconnector = new Reconnector(join, prevAddr); - reconnector.start(); + reconnector = new Reconnector(join, prevAddr); + reconnector.start(); + } } } } - } - else if (msg == SPI_RECONNECT_FAILED) { - if (reconnector != null) { - reconnector.cancel(); - reconnector.join(); - - reconnector = null; - } - else - assert forceFailMsg != null; + else if (msg == SPI_RECONNECT_FAILED) { + if (reconnector != null) { + reconnector.cancel(); + reconnector.join(); - if (spi.isClientReconnectDisabled()) { - if (state != SEGMENTED && state != STOPPED) { - if (forceFailMsg != null) { - U.quietAndWarn(log, "Local node was dropped from cluster due to network problems " + - "[nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + - ", msg=" + forceFailMsg.warning() + ']'); - } + reconnector = null; + } + else + assert forceFailMsg != null; + + if (spi.isClientReconnectDisabled()) { + if (state != SEGMENTED && state != STOPPED) { + if (forceFailMsg != null) { + U.quietAndWarn(log, "Local node was dropped from cluster due to network problems " + + "[nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + + ", msg=" + forceFailMsg.warning() + ']'); + } - if (log.isDebugEnabled()) { - log.debug("Failed to restore closed connection, reconnect disabled, " + - "local node segmented [networkTimeout=" + spi.netTimeout + ']'); - } + if (log.isDebugEnabled()) { + log.debug("Failed to restore closed connection, reconnect disabled, " + + "local node segmented [networkTimeout=" + spi.netTimeout + ']'); + } - state = SEGMENTED; + state = SEGMENTED; - notifyDiscovery( - EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); - } - } - else { - if (state == STARTING || state == CONNECTED) { - if (log.isDebugEnabled()) { - log.debug("Failed to restore closed connection, will try to reconnect " + - "[networkTimeout=" + spi.netTimeout + - ", joinTimeout=" + spi.joinTimeout + - ", failMsg=" + forceFailMsg + ']'); + notifyDiscovery( + EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); } - - onDisconnected(); } + else { + if (state == STARTING || state == CONNECTED) { + if (log.isDebugEnabled()) { + log.debug("Failed to restore closed connection, will try to reconnect " + + "[networkTimeout=" + spi.netTimeout + + ", joinTimeout=" + spi.joinTimeout + + ", failMsg=" + forceFailMsg + ']'); + } - UUID newId = UUID.randomUUID(); + onDisconnected(); + } - if (forceFailMsg != null) { - long delay = IgniteSystemProperties.getLong(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY, - DFLT_DISCO_FAILED_CLIENT_RECONNECT_DELAY); + UUID newId = UUID.randomUUID(); - if (delay > 0) { - U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + - "will try to reconnect with new id after " + delay + "ms (reconnect delay " + - "can be changed using IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY system " + - "property) [" + - "newId=" + newId + - ", prevId=" + locNode.id() + - ", locNode=" + locNode + - ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + - ", msg=" + forceFailMsg.warning() + ']'); + if (forceFailMsg != null) { + long delay = IgniteSystemProperties.getLong(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY, + DFLT_DISCO_FAILED_CLIENT_RECONNECT_DELAY); + + if (delay > 0) { + U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + + "will try to reconnect with new id after " + delay + "ms (reconnect delay " + + "can be changed using IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY system " + + "property) [" + + "newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode + + ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + + ", msg=" + forceFailMsg.warning() + ']'); + + Thread.sleep(delay); + } + else { + U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + + "will try to reconnect with new id [" + + "newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode + + ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + + ", msg=" + forceFailMsg.warning() + ']'); + } - Thread.sleep(delay); + forceFailMsg = null; } - else { - U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + - "will try to reconnect with new id [" + - "newId=" + newId + - ", prevId=" + locNode.id() + - ", locNode=" + locNode + - ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + - ", msg=" + forceFailMsg.warning() + ']'); + else if (log.isInfoEnabled()) { + log.info("Client node disconnected from cluster, will try to reconnect with new id " + + "[newId=" + newId + ", prevId=" + locNode.id() + ", locNode=" + locNode + ']'); } - forceFailMsg = null; - } - else if (log.isInfoEnabled()) { - log.info("Client node disconnected from cluster, will try to reconnect with new id " + - "[newId=" + newId + ", prevId=" + locNode.id() + ", locNode=" + locNode + ']'); - } + locNode.onClientDisconnected(newId); - locNode.onClientDisconnected(newId); - - tryJoin(); + tryJoin(); + } } - } - else { - TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg; - - if (joining()) { - IgniteSpiException err = null; + else { + if (joining()) { + IgniteSpiException err = null; - if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage) - err = spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg); - else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage) - err = spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg); - //TODO: https://issues.apache.org/jira/browse/IGNITE-9829 - else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage) - err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg); + if (dm instanceof TcpDiscoveryDuplicateIdMessage) + err = spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg); + else if (dm instanceof TcpDiscoveryAuthFailedMessage) + err = spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg); + //TODO: https://issues.apache.org/jira/browse/IGNITE-9829 + else if (dm instanceof TcpDiscoveryCheckFailedMessage) + err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg); - if (err != null) { - if (state == DISCONNECTED) { - U.error(log, "Failed to reconnect, segment local node.", err); + if (err != null) { + if (state == DISCONNECTED) { + U.error(log, "Failed to reconnect, segment local node.", err); - state = SEGMENTED; + state = SEGMENTED; - notifyDiscovery( - EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); - } - else - joinError(err); + notifyDiscovery( + EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); + } + else + joinError(err); - cancel(); + cancel(); - break; + break; + } } - } - processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg); + processDiscoveryMessage(dm); + } } } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 82c012c2a1a94..a8a4f2f0a473e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -95,6 +95,8 @@ import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable; +import org.apache.ignite.internal.thread.context.DistributedOperationContextManager; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor; import org.apache.ignite.internal.util.GridBoundedLinkedHashSet; import org.apache.ignite.internal.util.GridConcurrentHashSet; @@ -3046,8 +3048,10 @@ void addMessage(TcpDiscoveryAbstractMessage msg, boolean ignoreHighPriority, boo return; } - if (msg instanceof TraceableMessage) { - TraceableMessage tMsg = (TraceableMessage)msg; + if (!fromSocket) + msg.opCtxMsg = DistributedOperationContextManager.instance().collectDistributedAttributes(); + + if (msg instanceof TraceableMessage tMsg) { // If we read this message from socket. if (fromSocket) @@ -3173,11 +3177,8 @@ protected void runTasks() { task.run(); } - /** {@inheritDoc} */ - @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { - if (msg == WAKEUP) - return; - + /** */ + private void processMessage0(TcpDiscoveryAbstractMessage msg) { notifiedDiscovery.set(false); if (msg instanceof TraceableMessage) { @@ -3315,6 +3316,16 @@ else if (msg instanceof TcpDiscoveryAuthFailedMessage) } } + /** {@inheritDoc} */ + @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + if (msg == WAKEUP) + return; + + try (Scope ignored = DistributedOperationContextManager.instance().restoreDistributedAttributes(msg.opCtxMsg)) { + processMessage0(msg); + } + } + /** * Processes authentication failed message. * diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java index f23e36f200d27..d76279fb28082 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java @@ -52,7 +52,6 @@ public int port() { return port; } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(InetSocketAddressMessage.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 7a97763c36b25..60c38866034e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.DistributedOperationContextMessage; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -76,6 +77,11 @@ public abstract class TcpDiscoveryAbstractMessage implements Message { @Order(4) Set failedNodes; + /** Operation context attributes message. */ + @GridToStringInclude + @Order(5) + public @Nullable DistributedOperationContextMessage opCtxMsg; + /** * Default no-arg constructor for {@link Externalizable} interface. */ @@ -100,6 +106,7 @@ protected TcpDiscoveryAbstractMessage(TcpDiscoveryAbstractMessage msg) { verifierNodeId = msg.verifierNodeId; topVer = msg.topVer; flags = msg.flags; + opCtxMsg = msg.opCtxMsg; } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java index 9de906b27290c..bdf84b743ad2d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.thread.context; +import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; @@ -36,8 +37,13 @@ import java.util.function.Function; import java.util.function.Supplier; import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.managers.discovery.CustomEventListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture; @@ -48,6 +54,7 @@ import org.apache.ignite.internal.thread.pool.IgniteStripedThreadPoolExecutor; import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.queue.IgniteAsyncObjectHandler; import org.apache.ignite.internal.util.worker.queue.IgniteDelayedObjectHandler; @@ -56,6 +63,7 @@ import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.thread.IgniteThread; import org.junit.Test; @@ -64,6 +72,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** */ public class OperationContextAttributesTest extends GridCommonAbstractTest { @@ -98,6 +107,8 @@ public class OperationContextAttributesTest extends GridCommonAbstractTest { @Override protected void afterTest() throws Exception { super.afterTest(); + stopAllGrids(); + if (poolToShutdownAfterTest != null) poolToShutdownAfterTest.shutdownNow(); @@ -808,6 +819,98 @@ public void testContextAwareDelayQueue() throws Exception { } } + /** */ + @Test + public void testSendAttributesByDiscovery() throws Exception { + byte attrId1 = 0; + byte attrId2 = DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1; + + InetSocketAddressMessage dfltDistAttr1Val = new InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80); + GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1); + + // Local attribute 1. + OperationContextAttribute.newInstance(1000); + + // Distributed attribute 1. + OperationContextAttribute dAttr1 = DistributedOperationContextManager.instance() + .createDistributedAttribute(attrId1, dfltDistAttr1Val); + + // Local attribute 2. + OperationContextAttribute.newInstance("locaAttr2"); + + // Distributed attribute 2. + OperationContextAttribute dAttr2 = DistributedOperationContextManager.instance() + .createDistributedAttribute(attrId2, dfltDistrAttr2Val); + + startGrids(2); + startClientGrid(2); + + CountDownLatch coordLatch = new CountDownLatch(3); + CountDownLatch srvrLatch = new CountDownLatch(3); + CountDownLatch clientLatch = new CountDownLatch(3); + + InetSocketAddressMessage valToSend1 = new InetSocketAddressMessage(dfltDistAttr1Val.address(), 443); + GridCacheVersion valToSend2 = new GridCacheVersion(2, 2, 2); + + for (int i = 0; i < G.allGrids().size(); ++i) { + int i0 = i; + + grid(i).context().discovery().setCustomEventListener( + DynamicCacheChangeBatch.class, new CustomEventListener<>() { + @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, + DynamicCacheChangeBatch msg) { + + InetSocketAddressMessage receivedVal1 = OperationContext.get(dAttr1); + GridCacheVersion receivedVal2 = OperationContext.get(dAttr2); + + assertNotNull(receivedVal1); + assertNotNull(receivedVal2); + + assertFalse(dfltDistAttr1Val.port() == receivedVal1.port()); + assertEquals(receivedVal1.port(), valToSend1.port()); + assertEquals(receivedVal1.address(), valToSend1.address()); + + assertFalse(dfltDistrAttr2Val.equals(receivedVal2)); + assertTrue(valToSend2.equals(receivedVal2)); + + if (grid(i0).localNode().isClient()) + clientLatch.countDown(); + else if (grid(i0).localNode().order() == 1) + coordLatch.countDown(); + else + srvrLatch.countDown(); + } + }); + } + + // Send from the coordinator. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + grid(0).createCache(defaultCacheConfiguration()); + } + + assertTrue(waitForCondition(() -> coordLatch.getCount() == 2, getTestTimeout())); + assertTrue(waitForCondition(() -> srvrLatch.getCount() == 2, getTestTimeout())); + assertTrue(waitForCondition(() -> clientLatch.getCount() == 2, getTestTimeout())); + + // Send from a server. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + grid(1).destroyCache(DEFAULT_CACHE_NAME); + } + + assertTrue(waitForCondition(() -> coordLatch.getCount() == 1, getTestTimeout())); + assertTrue(waitForCondition(() -> srvrLatch.getCount() == 1, getTestTimeout())); + assertTrue(waitForCondition(() -> clientLatch.getCount() == 1, getTestTimeout())); + + // Send from a client. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + grid(2).createCache(defaultCacheConfiguration()); + } + + assertTrue(coordLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + assertTrue(srvrLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + } + /** */ private void doContextAwareExecutorServiceTest(ExecutorService pool) throws Exception { CountDownLatch poolUnblockedLatch = blockPool(pool); @@ -923,9 +1026,8 @@ public AttributeValueChecker(String expStrAttrVal, Integer expIntAttrVal) { /** */ static void assertAllCreatedChecksPassed() throws Exception { - for (AttributeValueChecker check : CHECKS) { + for (AttributeValueChecker check : CHECKS) check.get(5_000, MILLISECONDS); - } } /** */