Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
227c3a1
raw
Vladsz83 Feb 18, 2026
1e1dd00
fix
Vladsz83 Feb 18, 2026
43f4267
fix
Vladsz83 Feb 18, 2026
d68b352
fix
Vladsz83 Feb 19, 2026
5ac43ab
Merge branch 'master' into Message-serializer-for-TcpDiscoveryNodeAdd…
Vladsz83 Feb 19, 2026
1064805
refactoring. + dedicated if
Vladsz83 Feb 19, 2026
4adf01a
impl
Vladsz83 Feb 19, 2026
330d359
Revert "impl"
Vladsz83 Feb 19, 2026
1baa2f2
Revert "refactoring. + dedicated if"
Vladsz83 Feb 19, 2026
9067b9b
fix the serialization
Vladsz83 Feb 19, 2026
d4e6b2b
fixes
Vladsz83 Feb 20, 2026
fb52e0e
fixes
Vladsz83 Feb 20, 2026
333185b
fixes
Vladsz83 Feb 21, 2026
31a3d29
lost serialization fix
Vladsz83 Feb 21, 2026
0a2b97e
Merge branch 'master' into TcpDiscoveryNodeAddedMessage
Vladsz83 Feb 21, 2026
7272166
+ master
Vladsz83 Feb 21, 2026
31c46d9
minor
Vladsz83 Feb 21, 2026
e3e7ae2
impl
Vladsz83 Feb 21, 2026
629d051
impl
Vladsz83 Feb 22, 2026
c8a1ac8
fix
Vladsz83 Feb 22, 2026
ff8a9ee
fix
Vladsz83 Feb 22, 2026
15000d1
+ datapacket serr
Vladsz83 Feb 22, 2026
7b1351d
fix
Vladsz83 Feb 23, 2026
c1d3f81
minority
Vladsz83 Feb 23, 2026
0e72fe3
Merge branch 'master' into IGNITE-27556-TcpDiscoveryNodeAddedMessage_…
Vladsz83 Feb 24, 2026
ccc0c85
merged master
Vladsz83 Feb 24, 2026
e75a62c
reserach
Vladsz83 Feb 24, 2026
a9b9ed4
Revert "reserach"
Vladsz83 Feb 24, 2026
f47b463
Merge branch 'master' into IGNITE-27556-TcpDiscoveryNodeAddedMessage_…
Vladsz83 Feb 24, 2026
d3aa143
impl
Vladsz83 Feb 24, 2026
a1f9eac
+ TcpDiscoveryJoinRequestMessage
Vladsz83 Feb 24, 2026
554d32b
cleanup
Vladsz83 Feb 25, 2026
7ad43b8
Merge branch 'master' into IGNITE-27556-TcpDiscoveryNodeAddedMessage_…
Vladsz83 Feb 25, 2026
2015fb6
trivial
Vladsz83 Feb 26, 2026
9e84d84
Merge branch 'master' into IGNITE-27556-TcpDiscoveryNodeAddedMessage_…
Vladsz83 Feb 26, 2026
8233e7e
+ master
Vladsz83 Feb 26, 2026
a45a00b
+ master
Vladsz83 Feb 26, 2026
c401565
Merge branch 'master' into IGNITE-27556-TcpDiscoveryNodeAddedMessage_…
Vladsz83 Feb 28, 2026
e040436
Merge branch 'master' into IGNITE-27556-TcpDiscoveryNodeAddedMessage_…
Vladsz83 Mar 10, 2026
0ad2962
+ master
Vladsz83 Mar 10, 2026
27280a1
revert NodeAdded mg collection inclusion fix
Vladsz83 Mar 10, 2026
11ad3d2
minority
Vladsz83 Mar 11, 2026
39980f1
Merge branch 'master' into IGNITE-27556-TcpDiscoveryNodeAddedMessage_…
Vladsz83 Mar 13, 2026
2663f06
+ master
Vladsz83 Mar 13, 2026
122bc9d
impl
Vladsz83 Mar 13, 2026
7e15f9d
impl
Vladsz83 Mar 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponseSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessageMarshallableSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessageMarshallableSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
Expand All @@ -114,6 +116,8 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessageMarshallableSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessageMarshallableSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
Expand Down Expand Up @@ -152,6 +156,8 @@ public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMa

/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
factory.register((short)-110, TcpDiscoveryCollectionMessage::new,
new TcpDiscoveryCollectionMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
factory.register((short)-109, User::new, new UserSerializer());
factory.register((short)-108, UserManagementOperation::new, new UserManagementOperationSerializer());
factory.register((short)-107, NodeSpecificData::new, new NodeSpecificDataSerializer());
Expand Down Expand Up @@ -197,8 +203,9 @@ public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMa
factory.register((short)25, DistributedMetaStorageUpdateAckMessage::new, new DistributedMetaStorageUpdateAckMessageSerializer());
factory.register((short)26, DistributedMetaStorageCasMessage::new, new DistributedMetaStorageCasMessageSerializer());
factory.register((short)27, DistributedMetaStorageCasAckMessage::new, new DistributedMetaStorageCasAckMessageSerializer());
factory.register((short)28, TcpDiscoveryClientReconnectMessage::new,
new TcpDiscoveryClientReconnectMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
factory.register((short)28, TcpDiscoveryClientReconnectMessage::new, new TcpDiscoveryClientReconnectMessageSerializer());
factory.register((short)29, TcpDiscoveryNodeAddedMessage::new,
new TcpDiscoveryNodeAddedMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));

// DiscoveryCustomMessage
factory.register((short)500, CacheStatisticsModeChangeMessage::new, new CacheStatisticsModeChangeMessageSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,21 @@
package org.apache.ignite.spi.discovery.tcp.messages;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;

/**
* Message telling that client node is reconnecting to topology.
*/
@TcpDiscoveryEnsureDelivery
public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMessage implements MarshallableMessage {
public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMessage implements Message {
/** */
private static final long serialVersionUID = 0L;

Expand All @@ -46,13 +44,9 @@ public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMess
@Order(1)
IgniteUuid lastMsgId;

/** Pending messages. */
@GridToStringExclude
private Collection<TcpDiscoveryAbstractMessage> msgs;

/** Srialized bytes of {@link #msgs}. */
/** Pending messages holder. */
@Order(2)
byte[] msgsBytes;
@Nullable TcpDiscoveryCollectionMessage pendingMsgsMsg;

/** Constructor for {@link DiscoveryMessageFactory}. */
public TcpDiscoveryClientReconnectMessage() {
Expand Down Expand Up @@ -88,15 +82,15 @@ public IgniteUuid lastMessageId() {
/**
* @param msgs Pending messages.
*/
public void pendingMessages(Collection<TcpDiscoveryAbstractMessage> msgs) {
this.msgs = msgs;
public void pendingMessages(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs) {
pendingMsgsMsg = msgs == null ? null : new TcpDiscoveryCollectionMessage(msgs);
}

/**
* @return Pending messages.
*/
public Collection<TcpDiscoveryAbstractMessage> pendingMessages() {
return msgs;
return pendingMsgsMsg == null ? Collections.emptyList() : pendingMsgsMsg.messages();
}

/**
Expand Down Expand Up @@ -128,18 +122,6 @@ public boolean success() {
Objects.equals(lastMsgId, other.lastMsgId);
}

/** {@inheritDoc} */
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
if (msgs != null)
msgsBytes = U.marshal(marsh, msgs);
}

/** {@inheritDoc} */
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
if (msgsBytes != null)
msgs = U.unmarshal(marsh, msgsBytes, clsLdr);
}

/** {@inheritDoc} */
@Override public short directType() {
return 28;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.spi.discovery.tcp.messages;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;

/**
* TODO: Remove/revise after https://issues.apache.org/jira/browse/IGNITE-25883
* Message to transfer a collection of {@link TcpDiscoveryAbstractMessage} with the original order.
* Several of them might be a {@link Message}, several may not and require the original marshalling.
*/
public class TcpDiscoveryCollectionMessage implements MarshallableMessage {
/** {@link TcpDiscoveryAbstractMessage} pending messages which are a {@link Message}. */
@Order(0)
@Nullable Map<Integer, Message> writableMsgs;

/** Marshallable or Java-serializable pending messages which are not a {@link Message}. */
@Nullable private Map<Integer, TcpDiscoveryAbstractMessage> marshallableMsgs;

/** Marshalled {@link #marshallableMsgs}. */
@Order(1)
@GridToStringExclude
@Nullable byte[] marshallableMsgsBytes;

/** Constructor for {@link DiscoveryMessageFactory}. */
public TcpDiscoveryCollectionMessage() {
// No-op.
}

/** @param msgs Discovery messages to hold. */
public TcpDiscoveryCollectionMessage(Collection<TcpDiscoveryAbstractMessage> msgs) {
if (F.isEmpty(msgs))
return;

// Keeps the original message order.
int idx = 0;

for (TcpDiscoveryAbstractMessage m : msgs) {
if (m instanceof Message) {
if (writableMsgs == null)
writableMsgs = U.newHashMap(msgs.size());

writableMsgs.put(idx++, (Message)m);

continue;
}

if (marshallableMsgs == null)
marshallableMsgs = U.newHashMap(msgs.size());

marshallableMsgs.put(idx++, m);
}
}

/** @param marsh marshaller. */
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
if (marshallableMsgs != null)
marshallableMsgsBytes = U.marshal(marsh, marshallableMsgs);
}

/** {@inheritDoc} */
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
if (marshallableMsgsBytes != null)
marshallableMsgs = U.unmarshal(marsh, marshallableMsgsBytes, clsLdr);

marshallableMsgsBytes = null;
}

/**
* Gets pending messages sent to new node by its previous.
*
* @return Pending messages from previous node.
*/
public Collection<TcpDiscoveryAbstractMessage> messages() {
if (F.isEmpty(writableMsgs) && F.isEmpty(marshallableMsgs))
return Collections.emptyList();

int totalSz = (F.isEmpty(writableMsgs) ? 0 : writableMsgs.size())
+ (F.isEmpty(marshallableMsgs) ? 0 : marshallableMsgs.size());

List<TcpDiscoveryAbstractMessage> res = new ArrayList<>(totalSz);

for (int i = 0; i < totalSz; ++i) {
Message m = F.isEmpty(writableMsgs) ? null : writableMsgs.get(i);

if (m == null) {
TcpDiscoveryAbstractMessage adm = marshallableMsgs.get(i);

assert adm != null;

res.add(adm);
}
else {
assert marshallableMsgs == null || marshallableMsgs.get(i) == null;
assert m instanceof TcpDiscoveryAbstractMessage;

res.add((TcpDiscoveryAbstractMessage)m);
}
}

return res;
}

/** {@inheritDoc} */
@Override public short directType() {
return -110;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryCollectionMessage.class, this, "super", super.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public Map<String, Object> clientNodeAttributes() {
*/
public void clientNodeAttributes(Map<String, Object> clientNodeAttrs) {
this.clientNodeAttrs = clientNodeAttrs;
clientNodeAttrsBytes = null;
}

/** {@inheritDoc} */
Expand All @@ -135,6 +136,8 @@ public void clientNodeAttributes(Map<String, Object> clientNodeAttrs) {
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
if (clientNodeAttrsBytes != null)
clientNodeAttrs = U.unmarshal(marsh, clientNodeAttrsBytes, clsLdr);

clientNodeAttrsBytes = null;
}

/** {@inheritDoc} */
Expand Down
Loading