diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 3969c9dd31776..e43ef8de558ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -93,7 +93,9 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponseSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessageMarshallableSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessageSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessageMarshallableSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; @@ -114,6 +116,8 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessageMarshallableSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessageMarshallableSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage; @@ -152,6 +156,8 @@ public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMa /** {@inheritDoc} */ @Override public void registerAll(MessageFactory factory) { + factory.register((short)-110, TcpDiscoveryCollectionMessage::new, + new TcpDiscoveryCollectionMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); factory.register((short)-109, User::new, new UserSerializer()); factory.register((short)-108, UserManagementOperation::new, new UserManagementOperationSerializer()); factory.register((short)-107, NodeSpecificData::new, new NodeSpecificDataSerializer()); @@ -197,8 +203,9 @@ public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMa factory.register((short)25, DistributedMetaStorageUpdateAckMessage::new, new DistributedMetaStorageUpdateAckMessageSerializer()); factory.register((short)26, DistributedMetaStorageCasMessage::new, new DistributedMetaStorageCasMessageSerializer()); factory.register((short)27, DistributedMetaStorageCasAckMessage::new, new DistributedMetaStorageCasAckMessageSerializer()); - factory.register((short)28, TcpDiscoveryClientReconnectMessage::new, - new TcpDiscoveryClientReconnectMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); + factory.register((short)28, TcpDiscoveryClientReconnectMessage::new, new TcpDiscoveryClientReconnectMessageSerializer()); + factory.register((short)29, TcpDiscoveryNodeAddedMessage::new, + new TcpDiscoveryNodeAddedMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); // DiscoveryCustomMessage factory.register((short)500, CacheStatisticsModeChangeMessage::new, new CacheStatisticsModeChangeMessageSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java index 3eaf562e5a7fb..81aed7391fa03 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java @@ -18,23 +18,21 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.Collection; +import java.util.Collections; import java.util.Objects; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; /** * Message telling that client node is reconnecting to topology. */ @TcpDiscoveryEnsureDelivery -public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMessage implements MarshallableMessage { +public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMessage implements Message { /** */ private static final long serialVersionUID = 0L; @@ -46,13 +44,9 @@ public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMess @Order(1) IgniteUuid lastMsgId; - /** Pending messages. */ - @GridToStringExclude - private Collection msgs; - - /** Srialized bytes of {@link #msgs}. */ + /** Pending messages holder. */ @Order(2) - byte[] msgsBytes; + @Nullable TcpDiscoveryCollectionMessage pendingMsgsMsg; /** Constructor for {@link DiscoveryMessageFactory}. */ public TcpDiscoveryClientReconnectMessage() { @@ -88,15 +82,15 @@ public IgniteUuid lastMessageId() { /** * @param msgs Pending messages. */ - public void pendingMessages(Collection msgs) { - this.msgs = msgs; + public void pendingMessages(@Nullable Collection msgs) { + pendingMsgsMsg = msgs == null ? null : new TcpDiscoveryCollectionMessage(msgs); } /** * @return Pending messages. */ public Collection pendingMessages() { - return msgs; + return pendingMsgsMsg == null ? Collections.emptyList() : pendingMsgsMsg.messages(); } /** @@ -128,18 +122,6 @@ public boolean success() { Objects.equals(lastMsgId, other.lastMsgId); } - /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { - if (msgs != null) - msgsBytes = U.marshal(marsh, msgs); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { - if (msgsBytes != null) - msgs = U.unmarshal(marsh, msgsBytes, clsLdr); - } - /** {@inheritDoc} */ @Override public short directType() { return 28; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCollectionMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCollectionMessage.java new file mode 100644 index 0000000000000..025df9d37220d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCollectionMessage.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** + * TODO: Remove/revise after https://issues.apache.org/jira/browse/IGNITE-25883 + * Message to transfer a collection of {@link TcpDiscoveryAbstractMessage} with the original order. + * Several of them might be a {@link Message}, several may not and require the original marshalling. + */ +public class TcpDiscoveryCollectionMessage implements MarshallableMessage { + /** {@link TcpDiscoveryAbstractMessage} pending messages which are a {@link Message}. */ + @Order(0) + @Nullable Map writableMsgs; + + /** Marshallable or Java-serializable pending messages which are not a {@link Message}. */ + @Nullable private Map marshallableMsgs; + + /** Marshalled {@link #marshallableMsgs}. */ + @Order(1) + @GridToStringExclude + @Nullable byte[] marshallableMsgsBytes; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryCollectionMessage() { + // No-op. + } + + /** @param msgs Discovery messages to hold. */ + public TcpDiscoveryCollectionMessage(Collection msgs) { + if (F.isEmpty(msgs)) + return; + + // Keeps the original message order. + int idx = 0; + + for (TcpDiscoveryAbstractMessage m : msgs) { + if (m instanceof Message) { + if (writableMsgs == null) + writableMsgs = U.newHashMap(msgs.size()); + + writableMsgs.put(idx++, (Message)m); + + continue; + } + + if (marshallableMsgs == null) + marshallableMsgs = U.newHashMap(msgs.size()); + + marshallableMsgs.put(idx++, m); + } + } + + /** @param marsh marshaller. */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (marshallableMsgs != null) + marshallableMsgsBytes = U.marshal(marsh, marshallableMsgs); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + if (marshallableMsgsBytes != null) + marshallableMsgs = U.unmarshal(marsh, marshallableMsgsBytes, clsLdr); + + marshallableMsgsBytes = null; + } + + /** + * Gets pending messages sent to new node by its previous. + * + * @return Pending messages from previous node. + */ + public Collection messages() { + if (F.isEmpty(writableMsgs) && F.isEmpty(marshallableMsgs)) + return Collections.emptyList(); + + int totalSz = (F.isEmpty(writableMsgs) ? 0 : writableMsgs.size()) + + (F.isEmpty(marshallableMsgs) ? 0 : marshallableMsgs.size()); + + List res = new ArrayList<>(totalSz); + + for (int i = 0; i < totalSz; ++i) { + Message m = F.isEmpty(writableMsgs) ? null : writableMsgs.get(i); + + if (m == null) { + TcpDiscoveryAbstractMessage adm = marshallableMsgs.get(i); + + assert adm != null; + + res.add(adm); + } + else { + assert marshallableMsgs == null || marshallableMsgs.get(i) == null; + assert m instanceof TcpDiscoveryAbstractMessage; + + res.add((TcpDiscoveryAbstractMessage)m); + } + } + + return res; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -110; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryCollectionMessage.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java index 451689eabe008..c11bee62e8513 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -123,6 +123,7 @@ public Map clientNodeAttributes() { */ public void clientNodeAttributes(Map clientNodeAttrs) { this.clientNodeAttrs = clientNodeAttrs; + clientNodeAttrsBytes = null; } /** {@inheritDoc} */ @@ -135,6 +136,8 @@ public void clientNodeAttributes(Map clientNodeAttrs) { @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { if (clientNodeAttrsBytes != null) clientNodeAttrs = U.unmarshal(marsh, clientNodeAttrsBytes, clsLdr); + + clientNodeAttrsBytes = null; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 36540d8b7dfc1..71b86b4b2a2f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -20,37 +20,58 @@ import java.util.Collection; import java.util.Map; import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.jetbrains.annotations.Nullable; /** + * TODO: Use NodeMessage for {@link TcpDiscoveryNode} and {@link ClusterNode} after https://issues.apache.org/jira/browse/IGNITE-27899 * Message telling nodes that new node should be added to topology. * When newly added node receives the message it connects to its next and finishes * join process. */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage { +public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage implements MarshallableMessage { /** */ private static final long serialVersionUID = 0L; /** Added node. */ - private final TcpDiscoveryNode node; + private TcpDiscoveryNode node; + + /** Marshalled {@link #node}. */ + @Order(0) + @GridToStringExclude + byte[] nodeBytes; /** */ - private DiscoveryDataPacket dataPacket; + @Order(1) + DiscoveryDataPacket dataPacket; - /** Pending messages from previous node. */ - private Collection msgs; + /** Pending messages containner. */ + @Order(2) + @Nullable TcpDiscoveryCollectionMessage pendingMsgsMsg; /** Current topology. Initialized by coordinator. */ @GridToStringInclude private Collection top; + /** Marshalled {@link #top}. */ + @Order(3) + @GridToStringExclude + @Nullable byte[] topBytes; + /** */ @GridToStringInclude private transient Collection clientTop; @@ -58,8 +79,19 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM /** Topology snapshots history. */ private Map> topHist; + /** Marshalled {@link #topHist}. */ + @Order(4) + @GridToStringExclude + @Nullable byte[] topHistBytes; + /** Start time of the first grid node. */ - private final long gridStartTime; + @Order(5) + long gridStartTime; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeAddedMessage() { + // No-op. + } /** * Constructor. @@ -69,7 +101,8 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM * @param dataPacket container for collecting discovery data across the cluster. * @param gridStartTime Start time of the first grid node. */ - public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId, + public TcpDiscoveryNodeAddedMessage( + UUID creatorNodeId, TcpDiscoveryNode node, DiscoveryDataPacket dataPacket, long gridStartTime @@ -90,13 +123,16 @@ public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId, public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { super(msg); - this.node = msg.node; - this.msgs = msg.msgs; - this.top = msg.top; - this.clientTop = msg.clientTop; - this.topHist = msg.topHist; - this.dataPacket = msg.dataPacket; - this.gridStartTime = msg.gridStartTime; + node = msg.node; + nodeBytes = msg.nodeBytes; + pendingMsgsMsg = msg.pendingMsgsMsg; + top = msg.top; + topBytes = msg.topBytes; + clientTop = msg.clientTop; + topHist = msg.topHist; + topHistBytes = msg.topHistBytes; + dataPacket = msg.dataPacket; + gridStartTime = msg.gridStartTime; } /** @@ -113,8 +149,8 @@ public TcpDiscoveryNode node() { * * @return Pending messages from previous node. */ - @Nullable public Collection messages() { - return msgs; + public @Nullable Collection messages() { + return pendingMsgsMsg == null ? null : pendingMsgsMsg.messages(); } /** @@ -122,10 +158,8 @@ public TcpDiscoveryNode node() { * * @param msgs Pending messages to send to new node. */ - public void messages( - @Nullable Collection msgs - ) { - this.msgs = msgs; + public void messages(@Nullable Collection msgs) { + pendingMsgsMsg = F.isEmpty(msgs) ? null : new TcpDiscoveryCollectionMessage(msgs); } /** @@ -144,6 +178,7 @@ public void messages( */ public void topology(@Nullable Collection top) { this.top = top; + topBytes = null; } /** @@ -152,7 +187,7 @@ public void topology(@Nullable Collection top) { public void clientTopology(Collection top) { assert top != null && !top.isEmpty() : top; - this.clientTop = top; + clientTop = top; } /** @@ -178,6 +213,7 @@ public Map> topologyHistory() { */ public void topologyHistory(@Nullable Map> topHist) { this.topHist = topHist; + topHistBytes = null; } /** @@ -210,6 +246,37 @@ public long gridStartTime() { return gridStartTime; } + /** @param marsh marshaller. */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + nodeBytes = U.marshal(marsh, node); + + if (top != null) + topBytes = U.marshal(marsh, top); + + if (topHist != null) + topHistBytes = U.marshal(marsh, topHist); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + node = U.unmarshal(marsh, nodeBytes, clsLdr); + + if (topBytes != null) + top = U.unmarshal(marsh, topBytes, clsLdr); + + if (topHistBytes != null) + topHist = U.unmarshal(marsh, topHistBytes, clsLdr); + + nodeBytes = null; + topBytes = null; + topHistBytes = null; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 29; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryNodeAddedMessage.class, this, "super", super.toString()); diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 501daa170d1fd..93d482ac8fde8 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -2269,7 +2269,6 @@ org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientAckResponse org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse -org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDiscardMessage @@ -2283,7 +2282,6 @@ org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage$MetricsSet org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage$MetricsSet$1 org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage -org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest