From 7ca597b96998c7dd4363e9e811b9bff23d273e51 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Wed, 11 Mar 2026 17:43:32 +0300 Subject: [PATCH 1/7] WIP --- .../query/calcite/message/MessageType.java | 5 +- .../internal/MessageSerializerGenerator.java | 40 +++++++++ .../managers/communication/ErrorMessage.java | 84 ++++--------------- .../managers/communication/GridIoManager.java | 2 +- .../communication/GridIoMessageFactory.java | 21 ++++- .../discovery/DiscoveryMessageFactory.java | 4 +- .../communication/MarshallableMessage.java | 5 +- .../TcpDiscoveryClientReconnectMessage.java | 25 +----- .../TcpDiscoveryJoinRequestMessage.java | 25 +----- .../TcpDiscoveryNodeAddFinishedMessage.java | 25 +----- .../direct/DirectMarshallingMessagesTest.java | 4 +- .../communication/CompressedMessageTest.java | 3 +- .../communication/ErrorMessageSelfTest.java | 22 +++-- ...CommunicationMessageSerializationTest.java | 4 +- ...acheContinuousQueryImmutableEntryTest.java | 4 +- .../GridAbstractCommunicationSelfTest.java | 3 +- ...unicationSpiConcurrentConnectSelfTest.java | 4 +- ...GridTcpCommunicationSpiConfigSelfTest.java | 3 +- ...CommunicationSpiMultithreadedSelfTest.java | 3 +- ...cpCommunicationSpiRecoveryAckSelfTest.java | 4 +- ...idTcpCommunicationSpiRecoverySelfTest.java | 4 +- ...mmunicationRecoveryAckClosureSelfTest.java | 4 +- .../testframework/GridSpiTestContext.java | 4 +- ...hallableMessageMarshallableSerializer.java | 18 +++- 24 files changed, 157 insertions(+), 163 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java index 44438a8d4892b..75aeb4bb5297c 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java @@ -24,8 +24,11 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescriptionSerializer; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingSerializer; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageSerializer; +import static org.apache.ignite.marshaller.Marshallers.jdk; + /** * */ @@ -37,7 +40,7 @@ public enum MessageType { QUERY_START_RESPONSE(301, QueryStartResponse::new, new QueryStartResponseSerializer()), /** */ - QUERY_ERROR_MESSAGE(302, CalciteErrorMessage::new, new CalciteErrorMessageSerializer()), + QUERY_ERROR_MESSAGE(302, CalciteErrorMessage::new, new CalciteErrorMessageMarshallableSerializer(jdk(), U.gridClassLoader())), /** */ QUERY_BATCH_MESSAGE(303, QueryBatchMessage::new, new QueryBatchMessageSerializer()), diff --git a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java index 8acc83f01c099..8cb762250d4e7 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java @@ -280,9 +280,29 @@ private void start(Collection code, boolean write) { returnFalseIfWriteFailed(code, "writer.writeHeader", "directType()"); if (write && marshallableMessage()) { + imports.add("org.apache.ignite.IgniteCheckedException"); + imports.add("org.apache.ignite.IgniteException"); + code.add(EMPTY); + code.add(identedLine("try {")); + + indent++; + code.add(identedLine("msg.prepareMarshal(marshaller);")); + + indent--; + + code.add(identedLine("}")); + code.add(identedLine("catch (IgniteCheckedException e) {")); + + indent++; + + code.add(identedLine("throw new IgniteException(\"Failed to marshal object\", e);")); + + indent--; + + code.add(identedLine("}")); } code.add(EMPTY); @@ -949,8 +969,28 @@ private void finish(List code, boolean read, boolean marshallable) { code.add(EMPTY); if (read && marshallable) { + imports.add("org.apache.ignite.IgniteCheckedException"); + imports.add("org.apache.ignite.IgniteException"); + + code.add(identedLine("try {")); + + indent++; + code.add(identedLine("msg.finishUnmarshal(marshaller, clsLdr);")); + indent--; + + code.add(identedLine("}")); + code.add(identedLine("catch (IgniteCheckedException e) {")); + + indent++; + + code.add(identedLine("throw new IgniteException(\"Failed to unmarshal object\", e);")); + + indent--; + + code.add(identedLine("}")); + code.add(EMPTY); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java index 34146c63d11d4..917136ba2df5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java @@ -17,41 +17,26 @@ package org.apache.ignite.internal.managers.communication; -import java.io.Serializable; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.MessageProcessor; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.jdk.JdkMarshaller; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.marshaller.Marshallers.jdk; - /** * Message used to transfer {@link Throwable} objects. - *

Because raw serialization of throwables is prohibited, you should use this message when it is necessary - * to transfer some error as part of some message. See {@link MessageProcessor} for details. - *

Currently, under the hood marshalling and unmarshalling is performed by {@link JdkMarshaller}. - *

If the message serialization fails, wraps this error with own one. */ @SuppressWarnings({"NullableProblems", "unused"}) -// TODO Remove Serializable once https://issues.apache.org/jira/browse/IGNITE-27627 is completed. -public class ErrorMessage implements Message, Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Serialization and deserealization call holder. */ - @Order(value = 0, method = "errorBytes") +public class ErrorMessage implements MarshallableMessage { + /** Error bytes. */ + @Order(0) @GridToStringExclude @Nullable public byte[] errBytes; - /** Original error. It is transient and necessary only to avoid duplicated serialization and deserializtion. */ + /** Error. */ private @Nullable Throwable err; /** @@ -62,61 +47,20 @@ public ErrorMessage() { } /** - * @param err Original error. Will be lazily serialized. + * @param err Original error. */ public ErrorMessage(@Nullable Throwable err) { this.err = err; } - /** - * Provides serialized bytes of the error. Should be called only once. - * - * @return Serialized error. - * @see MessageWriter - */ - public @Nullable byte[] errorBytes() { - if (err == null) - return null; - - try { - return U.marshal(jdk(), err); - } - catch (IgniteCheckedException e0) { - IgniteCheckedException wrappedErr = new IgniteCheckedException(err.getMessage()); - - wrappedErr.setStackTrace(err.getStackTrace()); - wrappedErr.addSuppressed(e0); - - try { - return U.marshal(jdk(), wrappedErr); - } - catch (IgniteCheckedException e1) { - IgniteException marshErr = new IgniteException("Unable to marshal the wrapping error.", e1); - - marshErr.addSuppressed(wrappedErr); - - throw marshErr; - } - } + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + errBytes = U.marshal(marsh, err); } - /** - * Deserializes the error from {@code errBytes}. Should be called only once. - * - * @param errBytes Serialized error. - * @see MessageWriter - */ - public void errorBytes(@Nullable byte[] errBytes) { - if (F.isEmpty(errBytes)) - err = null; - else { - try { - err = U.unmarshal(jdk(), errBytes, U.gridClassLoader()); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to unmarshal error data bytes.", e); - } - } + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + err = U.unmarshal(marsh, errBytes, clsLdr); } /** */ @@ -125,7 +69,7 @@ public void errorBytes(@Nullable byte[] errBytes) { } /** - * Safely gets original error from an error message. + * Error. * * @param errorMsg Error message. * @return Error containing in the message. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 07785b6913feb..6c715d7a24436 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -449,7 +449,7 @@ public void resetMetrics() { List compMsgs = new ArrayList<>(); - compMsgs.add(new GridIoMessageFactory()); + compMsgs.add(new GridIoMessageFactory(marsh, U.resolveClassLoader(ctx.config()))); for (IgniteComponentType compType : IgniteComponentType.values()) { MessageFactoryProvider f = compType.messageFactory(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 6baea4e7684bb..f70ec43d87e62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -330,6 +330,7 @@ import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.internal.util.UUIDCollectionMessageSerializer; import org.apache.ignite.internal.util.distributed.SingleNodeMessage; +import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest; @@ -345,17 +346,35 @@ import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessageSerializer; import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage; import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessageSerializer; +import org.jetbrains.annotations.Nullable; /** * Message factory implementation. */ public class GridIoMessageFactory implements MessageFactoryProvider { + /** Custom data marshaller. */ + private final @Nullable Marshaller cstDataMarshall; + + /** Class loader for the custom data marshalling. */ + private final @Nullable ClassLoader cstDataMarshallClsLdr; + + /** + * @param cstDataMarshall Custom data marshaller. + * @param cstDataMarshallClsLdr Class loader for the custom data marshalling. + */ + public GridIoMessageFactory(@Nullable Marshaller cstDataMarshall, @Nullable ClassLoader cstDataMarshallClsLdr) { + assert cstDataMarshall == null && cstDataMarshallClsLdr == null || cstDataMarshall != null && cstDataMarshallClsLdr != null; + + this.cstDataMarshall = cstDataMarshall; + this.cstDataMarshallClsLdr = cstDataMarshallClsLdr; + } + /** {@inheritDoc} */ @Override public void registerAll(MessageFactory factory) { // -54 is reserved for SQL. // We don't use the code‑generated serializer for CompressedMessage - serialization is highly customized. factory.register(CompressedMessage.TYPE_CODE, CompressedMessage::new); - factory.register((short)-66, ErrorMessage::new, new ErrorMessageSerializer()); + factory.register((short)-66, ErrorMessage::new, new ErrorMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); factory.register((short)-65, TxInfo::new, new TxInfoSerializer()); factory.register((short)-64, TxEntriesInfo::new, new TxEntriesInfoSerializer()); factory.register((short)-63, ExchangeInfo::new, new ExchangeInfoSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 1e31e1bf0790d..1412837916fa6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.managers.discovery; import org.apache.ignite.internal.managers.communication.ErrorMessage; -import org.apache.ignite.internal.managers.communication.ErrorMessageSerializer; +import org.apache.ignite.internal.managers.communication.ErrorMessageMarshallableSerializer; import org.apache.ignite.internal.processors.authentication.User; import org.apache.ignite.internal.processors.authentication.UserAcceptedMessage; import org.apache.ignite.internal.processors.authentication.UserAcceptedMessageSerializer; @@ -166,7 +166,7 @@ public DiscoveryMessageFactory(@Nullable Marshaller cstDataMarshall, @Nullable C factory.register((short)-102, TcpDiscoveryNodeMetricsMessage::new, new TcpDiscoveryNodeMetricsMessageSerializer()); factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer()); factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer()); - factory.register((short)-66, ErrorMessage::new, new ErrorMessageSerializer()); + factory.register((short)-66, ErrorMessage::new, new ErrorMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); // TcpDiscoveryAbstractMessage factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MarshallableMessage.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MarshallableMessage.java index f58e6cb324110..f3cc81dd68e26 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MarshallableMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MarshallableMessage.java @@ -17,12 +17,13 @@ package org.apache.ignite.plugin.extensions.communication; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.marshaller.Marshaller; /** A {@link Message} which still requires external custom pre-marshalling and post-unmarshalling. */ public interface MarshallableMessage extends Message { /** @param marsh External custom marshaller. */ - public default void prepareMarshal(Marshaller marsh) { + public default void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { throw new UnsupportedOperationException(); } @@ -30,7 +31,7 @@ public default void prepareMarshal(Marshaller marsh) { * @param marsh External custom marshaller. * @param clsLdr External class loader to post-unmarshall. */ - public default void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + public default void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { throw new UnsupportedOperationException(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java index 9d18e1d5b3c37..280b384e6ff3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java @@ -21,7 +21,6 @@ import java.util.Objects; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -130,29 +129,13 @@ public boolean success() { } /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marsh) { - if (msgs != null && msgsBytes == null) { - try { - msgsBytes = U.marshal(marsh, msgs); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to marshal the pending messages.", e); - } - } + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + msgsBytes = U.marshal(marsh, msgs); } /** {@inheritDoc} */ - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { - if (msgsBytes != null && msgs == null) { - try { - msgs = U.unmarshal(marsh, msgsBytes, clsLdr); - - msgsBytes = null; - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to unmarshal the pending messages.", e); - } - } + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + msgs = U.unmarshal(marsh, msgsBytes, clsLdr); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java index 8932c3f7af901..1c26b0ae58830 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java @@ -18,7 +18,6 @@ package org.apache.ignite.spi.discovery.tcp.messages; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -96,29 +95,13 @@ public void responded(boolean responded) { } /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marsh) { - if (node != null && nodeBytes == null) { - try { - nodeBytes = U.marshal(marsh, node); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to marshal TcpDiscoveryNode object", e); - } - } + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + nodeBytes = U.marshal(marsh, node); } /** {@inheritDoc} */ - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { - if (nodeBytes != null && node == null) { - try { - node = U.unmarshal(marsh, nodeBytes, clsLdr); - - nodeBytes = null; - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to unmarshal TcpDiscoveryNode object", e); - } - } + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + node = U.unmarshal(marsh, nodeBytes, clsLdr); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java index 820c42156b53e..00b7eaaa0f42d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -127,29 +126,13 @@ public void clientNodeAttributes(Map clientNodeAttrs) { } /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marsh) { - if (clientNodeAttrs != null && clientNodeAttrsBytes == null) { - try { - clientNodeAttrsBytes = U.marshal(marsh, clientNodeAttrs); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to marshal client node attributes.", e); - } - } + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + clientNodeAttrsBytes = U.marshal(marsh, clientNodeAttrs); } /** {@inheritDoc} */ - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { - if (clientNodeAttrsBytes != null && clientNodeAttrs == null) { - try { - clientNodeAttrs = U.unmarshal(marsh, clientNodeAttrsBytes, clsLdr); - - clientNodeAttrsBytes = null; - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to unmarshal client node attributes.", e); - } - } + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + clientNodeAttrs = U.unmarshal(marsh, clientNodeAttrsBytes, clsLdr); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java index 3c07575f8b44a..d3fb2b64dbd0d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; import org.apache.ignite.internal.util.distributed.SingleNodeMessage; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; @@ -30,6 +31,7 @@ import org.junit.Test; import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS; +import static org.apache.ignite.marshaller.Marshallers.jdk; /** * Messages marshalling test. @@ -37,7 +39,7 @@ public class DirectMarshallingMessagesTest extends GridCommonAbstractTest { /** Message factory. */ private final MessageFactory msgFactory = - new IgniteMessageFactoryImpl(new MessageFactoryProvider[] {new GridIoMessageFactory()}); + new IgniteMessageFactoryImpl(new MessageFactoryProvider[] {new GridIoMessageFactory(jdk(), U.gridClassLoader())}); /** */ @Test diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java index 270fe7e47b37d..cadfc81088402 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.junit.Test; +import static org.apache.ignite.marshaller.Marshallers.jdk; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -45,7 +46,7 @@ public class CompressedMessageTest { /** */ @Test public void testWriteReadHugeMessage() { - MessageFactory msgFactory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new GridIoMessageFactory()}); + MessageFactory msgFactory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new GridIoMessageFactory(jdk(), U.gridClassLoader())}); DirectMessageWriter writer = new DirectMessageWriter(msgFactory); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/ErrorMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/ErrorMessageSelfTest.java index 8463e67df6313..b3ff36de94b8a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/ErrorMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/ErrorMessageSelfTest.java @@ -20,8 +20,10 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; import org.junit.Test; +import static org.apache.ignite.marshaller.Marshallers.jdk; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; @@ -31,20 +33,24 @@ public class ErrorMessageSelfTest { /** */ @Test - public void testDirectAndInsverseConversion() { + public void testDirectAndInsverseConversion() throws IgniteCheckedException { IgniteException e = new IgniteException("Test exception", new IgniteCheckedException("Test cause")); ErrorMessage msg0 = new ErrorMessage(e); - + assertSame(e, msg0.error()); - byte[] errBytes = msg0.errorBytes(); + msg0.prepareMarshal(jdk()); + + byte[] errBytes = msg0.errBytes; assertNotNull(errBytes); ErrorMessage msg1 = new ErrorMessage(); - msg1.errorBytes(errBytes); - + msg1.errBytes = errBytes; + + msg1.finishUnmarshal(jdk(), U.gridClassLoader()); + Throwable t = msg1.error(); assertNotNull(t); @@ -56,13 +62,13 @@ public void testDirectAndInsverseConversion() { @Test public void testNull() { assertNull(new ErrorMessage(null).error()); - assertNull(new ErrorMessage(null).errorBytes()); + assertNull(new ErrorMessage(null).errBytes); ErrorMessage msg = new ErrorMessage(); - msg.errorBytes(null); + msg.errBytes = null; assertNull(msg.error()); - assertNull(msg.errorBytes()); + assertNull(msg.errBytes); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java index a05bb4e9ac693..3f2a942ddb1ee 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java @@ -20,18 +20,20 @@ import java.util.UUID; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage; import static org.apache.ignite.internal.util.IgniteUtils.toBytes; +import static org.apache.ignite.marshaller.Marshallers.jdk; /** */ public class IgniteIoCommunicationMessageSerializationTest extends AbstractMessageSerializationTest { /** {@inheritDoc} */ @Override protected MessageFactoryProvider messageFactory() { - return new GridIoMessageFactory(); + return new GridIoMessageFactory(jdk(), U.gridClassLoader()); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java index 7dc74b73de6ee..e75ed12f40019 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObjectImpl; import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -44,6 +45,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.marshaller.Marshallers.jdk; /** * @@ -148,7 +150,7 @@ public void testCacheContinuousQueryEntrySerialization() { e0.markFiltered(); IgniteMessageFactoryImpl msgFactory = - new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new GridIoMessageFactory()}); + new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new GridIoMessageFactory(jdk(), U.gridClassLoader())}); ByteBuffer buf = ByteBuffer.allocate(4096); DirectMessageWriter writer = new DirectMessageWriter(msgFactory); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java index 3427867c84faf..9ccc5fd9ceeeb 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java @@ -47,6 +47,7 @@ import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; +import static org.apache.ignite.marshaller.Marshallers.jdk; /** * Super class for all communication self tests. @@ -160,7 +161,7 @@ private void startSpis() throws Exception { } }; - ctx.messageFactory(new IgniteMessageFactoryImpl(new MessageFactoryProvider[] {new GridIoMessageFactory(), testMsgFactory})); + ctx.messageFactory(new IgniteMessageFactoryImpl(new MessageFactoryProvider[] {new GridIoMessageFactory(jdk(), U.gridClassLoader()), testMsgFactory})); ctx.setLocalNode(node); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index b2ee543a8cb23..54c6a40c1fa96 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -64,6 +64,8 @@ import org.apache.ignite.testframework.junits.spi.GridSpiTest; import org.junit.Test; +import static org.apache.ignite.marshaller.Marshallers.jdk; + /** * */ @@ -438,7 +440,7 @@ private void startSpis(MessageListener lsnr) throws Exception { }; ctx.messageFactory(new IgniteMessageFactoryImpl( - new MessageFactoryProvider[] {new GridIoMessageFactory(), testMsgFactory}) + new MessageFactoryProvider[] {new GridIoMessageFactory(jdk(), U.gridClassLoader()), testMsgFactory}) ); ctx.setLocalNode(node); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java index d4d63ac7e6676..45a9020c84688 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java @@ -58,6 +58,7 @@ import static org.apache.ignite.IgniteSystemProperties.getBoolean; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; import static org.apache.ignite.internal.util.IgniteUtils.spiAttribute; +import static org.apache.ignite.marshaller.Marshallers.jdk; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.ATTR_HOST_NAMES; import static org.apache.ignite.testframework.GridTestUtils.getFreeCommPort; @@ -251,7 +252,7 @@ private TcpCommunicationSpi initializeSpi(GridSpiTestContext ctx, MessageFactoryProvider testMsgFactory = factory -> factory.register(GridTestMessage.DIRECT_TYPE, GridTestMessage::new); - ctx.messageFactory(new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new GridIoMessageFactory(), testMsgFactory})); + ctx.messageFactory(new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new GridIoMessageFactory(jdk(), U.gridClassLoader()), testMsgFactory})); ctx.setLocalNode(node); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java index 976108cef7ad9..2d08829de4352 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java @@ -64,6 +64,7 @@ import org.junit.Test; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; +import static org.apache.ignite.marshaller.Marshallers.jdk; /** * Class for multithreaded {@link TcpCommunicationSpi} test. @@ -468,7 +469,7 @@ private int getSpiCount() { MessageFactoryProvider testMsgFactory = factory -> factory.register(GridTestMessage.DIRECT_TYPE, GridTestMessage::new); ctx.messageFactory(new IgniteMessageFactoryImpl( - new MessageFactoryProvider[] {new GridIoMessageFactory(), testMsgFactory}) + new MessageFactoryProvider[] {new GridIoMessageFactory(jdk(), U.gridClassLoader()), testMsgFactory}) ); ctx.timeoutProcessor(timeoutProcessor); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index 479c300a2632f..d9673085e92fa 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -55,6 +55,8 @@ import org.apache.ignite.testframework.junits.spi.GridSpiTest; import org.junit.Test; +import static org.apache.ignite.marshaller.Marshallers.jdk; + /** * */ @@ -404,7 +406,7 @@ private void startSpis(int ackCnt, int idleTimeout, int queueLimit) throws Excep }; ctx.messageFactory(new IgniteMessageFactoryImpl( - new MessageFactoryProvider[] {new GridIoMessageFactory(), testMsgFactory}) + new MessageFactoryProvider[] {new GridIoMessageFactory(jdk(), U.gridClassLoader()), testMsgFactory}) ); ctx.setLocalNode(node); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java index c1cf05bea13dc..2169a66915fac 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java @@ -59,6 +59,8 @@ import org.apache.ignite.testframework.junits.spi.GridSpiTest; import org.junit.Test; +import static org.apache.ignite.marshaller.Marshallers.jdk; + /** * */ @@ -728,7 +730,7 @@ private void startSpis() throws Exception { MessageFactoryProvider testMsgFactory = factory -> factory.register(GridTestMessage.DIRECT_TYPE, GridTestMessage::new); ctx.messageFactory(new IgniteMessageFactoryImpl( - new MessageFactoryProvider[] {new GridIoMessageFactory(), testMsgFactory}) + new MessageFactoryProvider[] {new GridIoMessageFactory(jdk(), U.gridClassLoader()), testMsgFactory}) ); ctx.setLocalNode(node); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java index d6e406bf91ad5..de813ed505d3f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java @@ -58,6 +58,8 @@ import org.apache.ignite.testframework.junits.spi.GridSpiTest; import org.junit.Test; +import static org.apache.ignite.marshaller.Marshallers.jdk; + /** * */ @@ -457,7 +459,7 @@ private void startSpis(int ackCnt, int idleTimeout, int queueLimit) throws Excep }; ctx.messageFactory(new IgniteMessageFactoryImpl( - new MessageFactoryProvider[] {new GridIoMessageFactory(), testMsgFactory}) + new MessageFactoryProvider[] {new GridIoMessageFactory(jdk(), U.gridClassLoader()), testMsgFactory}) ); ctx.setLocalNode(node); diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index 651c8e099d50e..8809b1c204b41 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.logger.NullLogger; @@ -75,6 +76,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER; +import static org.apache.ignite.marshaller.Marshallers.jdk; /** * Test SPI context. @@ -552,7 +554,7 @@ public void triggerEvent(Event evt) { /** {@inheritDoc} */ @Override public MessageFactory messageFactory() { if (factory == null) - factory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new GridIoMessageFactory()}); + factory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new GridIoMessageFactory(jdk(), U.gridClassLoader())}); return factory; } diff --git a/modules/core/src/test/resources/codegen/TestMarshallableMessageMarshallableSerializer.java b/modules/core/src/test/resources/codegen/TestMarshallableMessageMarshallableSerializer.java index 365ec8b80bf7e..80602f050ee7d 100644 --- a/modules/core/src/test/resources/codegen/TestMarshallableMessageMarshallableSerializer.java +++ b/modules/core/src/test/resources/codegen/TestMarshallableMessageMarshallableSerializer.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.TestMarshallableMessage; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -45,7 +47,12 @@ public TestMarshallableMessageMarshallableSerializer(Marshaller marshaller, Clas if (!writer.writeHeader(msg.directType())) return false; - msg.prepareMarshal(marshaller); + try { + msg.prepareMarshal(marshaller); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal object", e); + } writer.onHeaderWritten(); } @@ -101,8 +108,13 @@ public TestMarshallableMessageMarshallableSerializer(Marshaller marshaller, Clas reader.incrementState(); } - msg.finishUnmarshal(marshaller, clsLdr); + try { + msg.finishUnmarshal(marshaller, clsLdr); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal object", e); + } return true; } -} \ No newline at end of file +} From e0996f4ad790c27719028e0791a7f1e6c4ad8844 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Wed, 11 Mar 2026 18:00:35 +0300 Subject: [PATCH 2/7] WIP --- .../communication/CompressedMessageTest.java | 3 ++- .../GridAbstractCommunicationSelfTest.java | 3 ++- ...GridTcpCommunicationSpiConfigSelfTest.java | 3 ++- .../codegen/TestMarshallableMessage.java | 24 ++++--------------- 4 files changed, 10 insertions(+), 23 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java index cadfc81088402..e5d8ca1930798 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java @@ -46,7 +46,8 @@ public class CompressedMessageTest { /** */ @Test public void testWriteReadHugeMessage() { - MessageFactory msgFactory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new GridIoMessageFactory(jdk(), U.gridClassLoader())}); + MessageFactory msgFactory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{ + new GridIoMessageFactory(jdk(), U.gridClassLoader())}); DirectMessageWriter writer = new DirectMessageWriter(msgFactory); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java index 9ccc5fd9ceeeb..b2b94378e982e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java @@ -161,7 +161,8 @@ private void startSpis() throws Exception { } }; - ctx.messageFactory(new IgniteMessageFactoryImpl(new MessageFactoryProvider[] {new GridIoMessageFactory(jdk(), U.gridClassLoader()), testMsgFactory})); + ctx.messageFactory(new IgniteMessageFactoryImpl(new MessageFactoryProvider[] { + new GridIoMessageFactory(jdk(), U.gridClassLoader()), testMsgFactory})); ctx.setLocalNode(node); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java index 45a9020c84688..11a00c77265e8 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java @@ -252,7 +252,8 @@ private TcpCommunicationSpi initializeSpi(GridSpiTestContext ctx, MessageFactoryProvider testMsgFactory = factory -> factory.register(GridTestMessage.DIRECT_TYPE, GridTestMessage::new); - ctx.messageFactory(new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new GridIoMessageFactory(jdk(), U.gridClassLoader()), testMsgFactory})); + ctx.messageFactory(new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{ + new GridIoMessageFactory(jdk(), U.gridClassLoader()), testMsgFactory})); ctx.setLocalNode(node); diff --git a/modules/core/src/test/resources/codegen/TestMarshallableMessage.java b/modules/core/src/test/resources/codegen/TestMarshallableMessage.java index 58a871692d3ac..7d33f076d889a 100644 --- a/modules/core/src/test/resources/codegen/TestMarshallableMessage.java +++ b/modules/core/src/test/resources/codegen/TestMarshallableMessage.java @@ -43,29 +43,13 @@ public class TestMarshallableMessage implements MarshallableMessage { byte[] cstDataBytes; /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marsh) { - if (cstData != null && cstDataBytes == null) { - try { - cstDataBytes = U.marshal(marsh, cstData); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to marshal custom data.", e); - } - } + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + cstDataBytes = U.marshal(marsh, cstData); } /** {@inheritDoc} */ - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { - if (cstDataBytes != null && cstData == null) { - try { - cstData = U.unmarshal(marsh, cstDataBytes, clsLdr); - - cstDataBytes = null; - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to unmarshal custom data.", e); - } - } + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + cstData = U.unmarshal(marsh, cstDataBytes, clsLdr); } public short directType() { From 3e1b38377fb5349c8566d1870033507f305f3f3f Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Wed, 11 Mar 2026 18:35:53 +0300 Subject: [PATCH 3/7] WIP --- .../calcite/message/CalciteErrorMessage.java | 34 ++++++++++++++++--- .../query/calcite/message/MessageType.java | 5 +-- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java index 99695f9cf1ac8..456f3487bdfda 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java @@ -18,13 +18,17 @@ package org.apache.ignite.internal.processors.query.calcite.message; import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.managers.communication.ErrorMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; /** * */ -public class CalciteErrorMessage extends ErrorMessage implements CalciteMessage { +public class CalciteErrorMessage implements CalciteMarshalableMessage { /** */ @Order(0) UUID qryId; @@ -33,6 +37,14 @@ public class CalciteErrorMessage extends ErrorMessage implements CalciteMessage @Order(1) long fragmentId; + /** Error bytes. */ + @Order(2) + @GridToStringExclude + @Nullable public byte[] errBytes; + + /** Error. */ + private @Nullable Throwable err; + /** */ public CalciteErrorMessage() { // No-op. @@ -40,12 +52,11 @@ public CalciteErrorMessage() { /** */ public CalciteErrorMessage(UUID qryId, long fragmentId, Throwable err) { - super(err); - assert err != null; this.qryId = qryId; this.fragmentId = fragmentId; + this.err = err; } /** @@ -62,6 +73,11 @@ public long fragmentId() { return fragmentId; } + /** */ + public @Nullable Throwable error() { + return err; + } + /** {@inheritDoc} */ @Override public MessageType type() { return MessageType.QUERY_ERROR_MESSAGE; @@ -71,4 +87,14 @@ public long fragmentId() { @Override public short directType() { return MessageType.QUERY_ERROR_MESSAGE.directType(); } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + errBytes = U.marshal(ctx.marshaller(), err); + } + + /** {@inheritDoc} */ + @Override public void prepareUnmarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + err = U.unmarshal(ctx.marshaller(), errBytes, U.resolveClassLoader(ctx.cache().context().gridConfig())); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java index 75aeb4bb5297c..44438a8d4892b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java @@ -24,11 +24,8 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescriptionSerializer; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingSerializer; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageSerializer; -import static org.apache.ignite.marshaller.Marshallers.jdk; - /** * */ @@ -40,7 +37,7 @@ public enum MessageType { QUERY_START_RESPONSE(301, QueryStartResponse::new, new QueryStartResponseSerializer()), /** */ - QUERY_ERROR_MESSAGE(302, CalciteErrorMessage::new, new CalciteErrorMessageMarshallableSerializer(jdk(), U.gridClassLoader())), + QUERY_ERROR_MESSAGE(302, CalciteErrorMessage::new, new CalciteErrorMessageSerializer()), /** */ QUERY_BATCH_MESSAGE(303, QueryBatchMessage::new, new QueryBatchMessageSerializer()), From a89e023e051b39f98601b001f5aab04ddee3b63b Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 12 Mar 2026 12:19:18 +0300 Subject: [PATCH 4/7] raw --- .../internal/GridJobExecuteRequest.java | 16 ++--- .../managers/communication/GridIoManager.java | 10 --- .../managers/communication/GridIoMessage.java | 7 ++- .../communication/GridIoMessageFactory.java | 15 +++-- .../eventstorage/GridEventStorageManager.java | 11 +--- .../eventstorage/GridEventStorageMessage.java | 36 +++++------ .../processors/job/GridJobProcessor.java | 62 +++++++------------ .../processors/task/GridTaskWorker.java | 2 - 8 files changed, 60 insertions(+), 99 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java index 089b1ad209432..9ecaf64976565 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java @@ -34,13 +34,14 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.jetbrains.annotations.Nullable; /** * Job execution request. */ @SuppressWarnings({"AssignmentOrReturnOfFieldWithMutableType", "NullableProblems"}) -public class GridJobExecuteRequest implements ExecutorAwareMessage { +public class GridJobExecuteRequest implements ExecutorAwareMessage, MarshallableMessage { /** */ @Order(0) IgniteUuid sesId; @@ -452,10 +453,8 @@ public AffinityTopologyVersion topologyVersion() { return S.toString(GridJobExecuteRequest.class, this); } - /** - * @param marsh Marshaller. - */ - public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { jobBytes = U.marshal(marsh, job); topPredBytes = U.marshal(marsh, topPred); siblingsBytes = U.marshal(marsh, siblings); @@ -463,11 +462,8 @@ public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { jobAttrsBytes = U.marshal(marsh, jobAttrs); } - /** - * @param marsh Marshaller. - * @param ldr Class loader. - */ - public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { assert top != null || topPredBytes != null; assert sesAttrsBytes != null || !sesFullSup; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 6c715d7a24436..58e7079dbaa4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1204,8 +1204,6 @@ private void onChannelOpened0(UUID rmtNodeId, GridIoMessage initMsg, Channel cha if (topicOrd >= 0) initMsg.topic(GridTopic.fromOrdinal(topicOrd)); - else - initMsg.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config())); } byte plc = initMsg.policy(); @@ -1253,8 +1251,6 @@ private void onMessage0(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) { if (topicOrd >= 0) msg.topic(GridTopic.fromOrdinal(topicOrd)); - else - msg.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config())); } if (!started) { @@ -1980,9 +1976,6 @@ private IgniteInternalFuture openChannel( ); try { - if (topicOrd < 0) - ioMsg.prepareMarshal(marsh); - return ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).openChannel(node, ioMsg); } catch (IgniteSpiException e) { @@ -2054,9 +2047,6 @@ else if (async) ackC.apply(null); } else { - if (topicOrd < 0) - ioMsg.prepareMarshal(marsh); - try { if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi) getTcpCommunicationSpi().sendMessage(node, ioMsg, ackC); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java index b42b8df962a7e..bef3b678b6b85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java @@ -27,13 +27,14 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** * Wrapper for all grid messages. */ -public class GridIoMessage implements Message, SpanTransport { +public class GridIoMessage implements MarshallableMessage, SpanTransport { /** */ public static final Integer STRIPE_DISABLED_PART = Integer.MIN_VALUE; @@ -219,7 +220,7 @@ public int partition() { /** * @param marsh Marshaller. */ - public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { if (topic != null && topicBytes == null) topicBytes = U.marshal(marsh, topic); } @@ -228,7 +229,7 @@ public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { * @param marsh Marshaller. * @param ldr Class loader. */ - public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { if (topicBytes != null && topic == null) { topic = U.unmarshal(marsh, topicBytes, ldr); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index f70ec43d87e62..2b0e64ee5b845 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -22,7 +22,7 @@ import org.apache.ignite.internal.GridJobCancelRequest; import org.apache.ignite.internal.GridJobCancelRequestSerializer; import org.apache.ignite.internal.GridJobExecuteRequest; -import org.apache.ignite.internal.GridJobExecuteRequestSerializer; +import org.apache.ignite.internal.GridJobExecuteRequestMarshallableSerializer; import org.apache.ignite.internal.GridJobExecuteResponse; import org.apache.ignite.internal.GridJobExecuteResponseSerializer; import org.apache.ignite.internal.GridJobSiblingsRequest; @@ -62,7 +62,7 @@ import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse; import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponseSerializer; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage; -import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessageSerializer; +import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessageMarshallableSerializer; import org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessage; import org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessageSerializer; import org.apache.ignite.internal.processors.authentication.UserAuthenticateResponseMessage; @@ -399,19 +399,21 @@ public GridIoMessageFactory(@Nullable Marshaller cstDataMarshall, @Nullable Clas factory.register(TcpCommunicationSpi.HANDSHAKE_MSG_TYPE, HandshakeMessage::new, new HandshakeMessageSerializer()); factory.register(TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE, HandshakeWaitMessage::new, new HandshakeWaitMessageSerializer()); factory.register((short)0, GridJobCancelRequest::new, new GridJobCancelRequestSerializer()); - factory.register((short)1, GridJobExecuteRequest::new, new GridJobExecuteRequestSerializer()); + factory.register((short)1, GridJobExecuteRequest::new, + new GridJobExecuteRequestMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); factory.register((short)2, GridJobExecuteResponse::new, new GridJobExecuteResponseSerializer()); factory.register((short)3, GridJobSiblingsRequest::new, new GridJobSiblingsRequestSerializer()); factory.register((short)4, GridJobSiblingsResponse::new, new GridJobSiblingsResponseSerializer()); factory.register((short)5, GridTaskCancelRequest::new, new GridTaskCancelRequestSerializer()); factory.register((short)6, GridTaskSessionRequest::new, new GridTaskSessionRequestSerializer()); factory.register((short)7, GridCheckpointRequest::new, new GridCheckpointRequestSerializer()); - factory.register((short)8, GridIoMessage::new, new GridIoMessageSerializer()); + factory.register((short)8, GridIoMessage::new, new GridIoMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); factory.register((short)9, GridIoUserMessage::new, new GridIoUserMessageSerializer()); factory.register((short)10, GridDeploymentInfoBean::new, new GridDeploymentInfoBeanSerializer()); factory.register((short)11, GridDeploymentRequest::new, new GridDeploymentRequestSerializer()); factory.register((short)12, GridDeploymentResponse::new, new GridDeploymentResponseSerializer()); - factory.register((short)13, GridEventStorageMessage::new, new GridEventStorageMessageSerializer()); + factory.register((short)13, GridEventStorageMessage::new, + new GridEventStorageMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); factory.register((short)16, GridCacheTxRecoveryRequest::new, new GridCacheTxRecoveryRequestSerializer()); factory.register((short)17, GridCacheTxRecoveryResponse::new, new GridCacheTxRecoveryResponseSerializer()); factory.register((short)18, IndexQueryResultMeta::new, new IndexQueryResultMetaSerializer()); @@ -518,7 +520,8 @@ public GridIoMessageFactory(@Nullable Marshaller cstDataMarshall, @Nullable Clas factory.register((short)169, ServiceSingleNodeDeploymentResult::new, new ServiceSingleNodeDeploymentResultSerializer()); factory.register(GridQueryKillRequest.TYPE_CODE, GridQueryKillRequest::new, new GridQueryKillRequestSerializer()); factory.register(GridQueryKillResponse.TYPE_CODE, GridQueryKillResponse::new, new GridQueryKillResponseSerializer()); - factory.register(GridIoSecurityAwareMessage.TYPE_CODE, GridIoSecurityAwareMessage::new, new GridIoSecurityAwareMessageSerializer()); + factory.register(GridIoSecurityAwareMessage.TYPE_CODE, GridIoSecurityAwareMessage::new, + new GridIoSecurityAwareMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); factory.register(SessionChannelMessage.TYPE_CODE, SessionChannelMessage::new, new SessionChannelMessageSerializer()); factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new); factory.register((short)177, TcpInverseConnectionResponseMessage::new, new TcpInverseConnectionResponseMessageSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index fcfa3e5b37d36..e9e34a183dca3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -1025,15 +1025,6 @@ private List query(IgnitePredicate p, Collection)req.filter(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java index dade283bc0d1f..0f38f3fc18eba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java @@ -32,13 +32,13 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.jetbrains.annotations.Nullable; /** * Event storage message. */ -public class GridEventStorageMessage implements Message { +public class GridEventStorageMessage implements MarshallableMessage { /** */ private Object resTopic; @@ -205,10 +205,8 @@ public void loaderParticipants(@Nullable Map ldrParties) { return ErrorMessage.error(errMsg); } - /** - * @param marsh Marshaller. - */ - public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { if (resTopic != null && resTopicBytes == null) resTopicBytes = U.marshal(marsh, resTopic); @@ -219,24 +217,14 @@ public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { evtsBytes = U.marshal(marsh, evts); } - /** - * @param marsh Marshaller. - * @param ldr Class loader. - * @param filterClsLdr Class loader for filter. - */ - public void finishUnmarshal(Marshaller marsh, ClassLoader ldr, ClassLoader filterClsLdr) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { if (resTopicBytes != null && resTopic == null) { resTopic = U.unmarshal(marsh, resTopicBytes, ldr); resTopicBytes = null; } - if (filterBytes != null && filter == null && filterClsLdr != null) { - filter = U.unmarshal(marsh, filterBytes, filterClsLdr); - - filterBytes = null; - } - if (evtsBytes != null && evts == null) { evts = U.unmarshal(marsh, evtsBytes, ldr); @@ -244,6 +232,18 @@ public void finishUnmarshal(Marshaller marsh, ClassLoader ldr, ClassLoader filte } } + /** + * @param marsh Marshaller. + * @param filterClsLdr Class loader for filter. + */ + public void finishUnmarshalFilters(Marshaller marsh, ClassLoader filterClsLdr) throws IgniteCheckedException { + if (filterBytes != null && filter == null) { + filter = U.unmarshal(marsh, filterBytes, filterClsLdr); + + filterBytes = null; + } + } + /** {@inheritDoc} */ @Override public short directType() { return 13; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index fd1b0122d6dae..98c43ca473a47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -1257,49 +1257,31 @@ public void processJobExecuteRequest(ClusterNode node, final GridJobExecuteReque GridJobSessionImpl jobSes; GridJobContextImpl jobCtx; - boolean loc = ctx.localNodeId().equals(node.id()) && !ctx.config().isMarshalLocalJobs(); - - try { - if (!loc) - req.finishUnmarshal(marsh, U.resolveClassLoader(dep.classLoader(), ctx.config())); - - // Note that we unmarshal session/job attributes here with proper class loader. - GridTaskSessionImpl taskSes = ctx.session().createTaskSession( - req.sessionId(), - node.id(), - req.taskName(), - dep, - req.taskClassName(), - req.topology(), - req.getTopologyPredicate(), - req.startTaskTime(), - endTime, - req.getSiblings(), - req.getSessionAttributes(), - req.sessionFullSupport(), - req.internal(), - req.executorName(), - ctx.security().securityContext() - ); - - taskSes.setCheckpointSpi(req.checkpointSpi()); - taskSes.setClassLoader(dep.classLoader()); - - jobSes = new GridJobSessionImpl(ctx, taskSes, req.jobId()); - - jobCtx = new GridJobContextImpl(ctx, req.jobId(), req.getJobAttributes()); - } - catch (IgniteCheckedException e) { - IgniteException ex = new IgniteException("Failed to deserialize task attributes " + - "[taskName=" + req.taskName() + ", taskClsName=" + req.taskClassName() + - ", codeVer=" + req.userVersion() + ", taskClsLdr=" + dep.classLoader() + ']', e); + // Note that we unmarshal session/job attributes here with proper class loader. + GridTaskSessionImpl taskSes = ctx.session().createTaskSession( + req.sessionId(), + node.id(), + req.taskName(), + dep, + req.taskClassName(), + req.topology(), + req.getTopologyPredicate(), + req.startTaskTime(), + endTime, + req.getSiblings(), + req.getSessionAttributes(), + req.sessionFullSupport(), + req.internal(), + req.executorName(), + ctx.security().securityContext() + ); - U.error(log, ex.getMessage(), e); + taskSes.setCheckpointSpi(req.checkpointSpi()); + taskSes.setClassLoader(dep.classLoader()); - handleException(node, req, ex, endTime); + jobSes = new GridJobSessionImpl(ctx, taskSes, req.jobId()); - return; - } + jobCtx = new GridJobContextImpl(ctx, req.jobId(), req.getJobAttributes()); job = new GridJobWorker( ctx, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 16c91b86edf53..b62c7e1672e83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -1416,8 +1416,6 @@ private void sendRequest(ComputeJobResult res) { if (loc) ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req); else { - req.prepareMarshal(marsh); - byte plc; if (internal) From 987d1f8435d138c8bf75b06e422585e5c8a3e277 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Sat, 14 Mar 2026 01:09:41 +0300 Subject: [PATCH 5/7] fix --- .../internal/managers/communication/GridIoMessageFactory.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 122b39ec5763e..152b3acab92f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -346,7 +346,6 @@ import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessageSerializer; import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage; import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessageSerializer; -import org.jetbrains.annotations.Nullable; /** * Message factory implementation. From f440d5ee8d0e9c29a9b7746b2a31ced635e58b85 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Sat, 14 Mar 2026 02:17:38 +0300 Subject: [PATCH 6/7] impl --- .../managers/communication/GridIoMessage.java | 9 ++------ .../communication/GridIoMessageFactory.java | 8 ++++--- .../GridDeploymentCommunication.java | 21 ------------------- .../deployment/GridDeploymentRequest.java | 17 ++++++--------- .../eventstorage/GridEventStorageManager.java | 8 +------ ...ntRequestOfUnknownClassProcessingTest.java | 2 -- .../p2p/GridP2PHotRedeploymentSelfTest.java | 16 -------------- 7 files changed, 14 insertions(+), 67 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java index bef3b678b6b85..31e5649bddce4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java @@ -217,18 +217,13 @@ public int partition() { return null; } - /** - * @param marsh Marshaller. - */ + /** {@inheritDoc} */ @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { if (topic != null && topicBytes == null) topicBytes = U.marshal(marsh, topic); } - /** - * @param marsh Marshaller. - * @param ldr Class loader. - */ + /** {@inheritDoc} */ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { if (topicBytes != null && topic == null) { topic = U.unmarshal(marsh, topicBytes, ldr); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 152b3acab92f7..81c593429c3a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -54,7 +54,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBeanSerializer; import org.apache.ignite.internal.managers.deployment.GridDeploymentRequest; -import org.apache.ignite.internal.managers.deployment.GridDeploymentRequestSerializer; +import org.apache.ignite.internal.managers.deployment.GridDeploymentRequestMarshallableSerializer; import org.apache.ignite.internal.managers.deployment.GridDeploymentResponse; import org.apache.ignite.internal.managers.deployment.GridDeploymentResponseSerializer; import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest; @@ -404,10 +404,12 @@ public GridIoMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarsh factory.register((short)5, GridTaskCancelRequest::new, new GridTaskCancelRequestSerializer()); factory.register((short)6, GridTaskSessionRequest::new, new GridTaskSessionRequestSerializer()); factory.register((short)7, GridCheckpointRequest::new, new GridCheckpointRequestSerializer()); - factory.register((short)8, GridIoMessage::new, new GridIoMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); + factory.register((short)8, GridIoMessage::new, + new GridIoMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); factory.register((short)9, GridIoUserMessage::new, new GridIoUserMessageSerializer()); factory.register((short)10, GridDeploymentInfoBean::new, new GridDeploymentInfoBeanSerializer()); - factory.register((short)11, GridDeploymentRequest::new, new GridDeploymentRequestSerializer()); + factory.register((short)11, GridDeploymentRequest::new, + new GridDeploymentRequestMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); factory.register((short)12, GridDeploymentResponse::new, new GridDeploymentResponseSerializer()); factory.register((short)13, GridEventStorageMessage::new, new GridEventStorageMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java index 6de75ab20899c..899075f44487a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java @@ -41,7 +41,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteNotPeerDeployable; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; @@ -72,9 +71,6 @@ class GridDeploymentCommunication { /** */ private final GridBusyLock busyLock = new GridBusyLock(); - /** */ - private final Marshaller marsh; - /** * Creates new instance of deployment communication. * @@ -92,8 +88,6 @@ class GridDeploymentCommunication { processDeploymentRequest(nodeId, msg); } }; - - marsh = ctx.marshaller(); } /** @@ -186,18 +180,6 @@ private void processResourceRequest(UUID nodeId, GridDeploymentRequest req) { if (log.isDebugEnabled()) log.debug("Received peer class/resource loading request [originatingNodeId=" + nodeId + ", req=" + req + ']'); - if (req.responseTopic() == null) { - try { - req.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config())); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process deployment request (will ignore) [" + - "originatingNodeId=" + nodeId + ", req=" + req + ']', e); - - return; - } - } - GridDeploymentResponse res = new GridDeploymentResponse(); GridDeployment dep = ctx.deploy().getDeployment(req.classLoaderId()); @@ -417,9 +399,6 @@ GridDeploymentResponse sendResourceRequest(final String rsrcName, IgniteUuid cls long start = U.currentTimeMillis(); - if (req.responseTopic() != null && !ctx.localNodeId().equals(dstNode.id())) - req.prepareMarshal(marsh); - ctx.io().sendToGridTopic(dstNode, TOPIC_CLASSLOAD, req, GridIoPolicy.P2P_POOL); if (log.isDebugEnabled()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentRequest.java index 00890882e0590..ac9e8fe499196 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentRequest.java @@ -26,12 +26,12 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; /** * Deployment request. */ -public class GridDeploymentRequest implements Message { +public class GridDeploymentRequest implements MarshallableMessage { /** Response topic. Response should be sent back to this topic. */ private Object resTopic; @@ -134,19 +134,14 @@ public void nodeIds(Collection nodeIds) { this.nodeIds = nodeIds; } - /** - * @param marsh Marshaller. - */ - public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { if (resTopic != null && resTopicBytes == null) resTopicBytes = U.marshal(marsh, resTopic); } - /** - * @param marsh Marshaller. - * @param ldr Class loader. - */ - public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { if (resTopicBytes != null && resTopic == null) { resTopic = U.unmarshal(marsh, resTopicBytes, ldr); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index e9e34a183dca3..38a773de2bb4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -1137,11 +1137,8 @@ private void sendMessage(Collection nodes, GridTopic topi if (locNode != null) ctx.io().sendToGridTopic(locNode, topic, msg, plc); - if (!rmtNodes.isEmpty()) { - msg.prepareMarshal(marsh); - + if (!rmtNodes.isEmpty()) ctx.io().sendToGridTopic(rmtNodes, topic, msg, plc); - } } /** @@ -1243,9 +1240,6 @@ private class RequestListener implements GridMessageListener { if (log.isDebugEnabled()) log.debug("Sending event query response to node [nodeId=" + nodeId + "res=" + res + ']'); - if (!ctx.localNodeId().equals(nodeId)) - res.prepareMarshal(marsh); - ctx.io().sendToCustomTopic(node, req.responseTopic(), res, PUBLIC_POOL); } catch (ClusterTopologyCheckedException e) { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/DeploymentRequestOfUnknownClassProcessingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/DeploymentRequestOfUnknownClassProcessingTest.java index 356e81477b3de..c338150a95acb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/DeploymentRequestOfUnknownClassProcessingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/DeploymentRequestOfUnknownClassProcessingTest.java @@ -127,8 +127,6 @@ public void testResponseReceivingOnDeploymentRequestOfUnknownClass() throws Exce GridDeploymentRequest req = new GridDeploymentRequest(TEST_TOPIC_NAME, locDep.classLoaderId(), UNKNOWN_CLASS_NAME, false); - req.prepareMarshal(locNode.context().marshaller()); - locNode.context().io().sendToGridTopic(remNode.localNode(), TOPIC_CLASSLOAD, req, GridIoPolicy.P2P_POOL); // Сhecks that the expected response has been received. diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java index 70db1390a1698..2cc6e5b9dee9e 100644 --- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java @@ -122,18 +122,6 @@ private void processTestClassLoaderHotRedeployment(DeploymentMode depMode) throw assert taskCls1.getClassLoader() != taskCls2.getClassLoader(); assert taskCls1 != taskCls2; -// final AtomicBoolean undeployed = new AtomicBoolean(false); -// -// grid2.events().localListen(new GridLocalEventListener() { -// @Override public void onEvent(GridEvent evt) { -// if (evt.type() == EVT_TASK_UNDEPLOYED) { -// assert ((GridDeploymentEvent)evt).alias().equals(TASK_NAME); -// -// undeployed.set(true); -// } -// } -// }, EVT_TASK_UNDEPLOYED); - ignite2.compute().localDeployTask(taskCls1, taskCls1.getClassLoader()); Integer res1 = ignite1.compute().execute(taskCls1, Collections.singletonList(ignite2.cluster().localNode().id())); @@ -149,10 +137,6 @@ private void processTestClassLoaderHotRedeployment(DeploymentMode depMode) throw info("Result2: " + res2); assert !res1.equals(res2); - -// Thread.sleep(P2P_TIMEOUT * 2); -// -// assert undeployed.get(); } finally { stopGrid(2); From 4de828f6584b12418cf60a79277f8aed2189a9d3 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Sat, 14 Mar 2026 11:07:25 +0300 Subject: [PATCH 7/7] impl --- .../internal/GridJobExecuteRequest.java | 16 +++-- .../communication/GridIoMessageFactory.java | 5 +- .../processors/job/GridJobProcessor.java | 62 ++++++++++++------- .../processors/task/GridTaskWorker.java | 2 + 4 files changed, 54 insertions(+), 31 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java index 9ecaf64976565..089b1ad209432 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java @@ -34,14 +34,13 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.jetbrains.annotations.Nullable; /** * Job execution request. */ @SuppressWarnings({"AssignmentOrReturnOfFieldWithMutableType", "NullableProblems"}) -public class GridJobExecuteRequest implements ExecutorAwareMessage, MarshallableMessage { +public class GridJobExecuteRequest implements ExecutorAwareMessage { /** */ @Order(0) IgniteUuid sesId; @@ -453,8 +452,10 @@ public AffinityTopologyVersion topologyVersion() { return S.toString(GridJobExecuteRequest.class, this); } - /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + /** + * @param marsh Marshaller. + */ + public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { jobBytes = U.marshal(marsh, job); topPredBytes = U.marshal(marsh, topPred); siblingsBytes = U.marshal(marsh, siblings); @@ -462,8 +463,11 @@ public AffinityTopologyVersion topologyVersion() { jobAttrsBytes = U.marshal(marsh, jobAttrs); } - /** {@inheritDoc} */ - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { + /** + * @param marsh Marshaller. + * @param ldr Class loader. + */ + public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { assert top != null || topPredBytes != null; assert sesAttrsBytes != null || !sesFullSup; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 81c593429c3a2..3922c3b861a6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -22,7 +22,7 @@ import org.apache.ignite.internal.GridJobCancelRequest; import org.apache.ignite.internal.GridJobCancelRequestSerializer; import org.apache.ignite.internal.GridJobExecuteRequest; -import org.apache.ignite.internal.GridJobExecuteRequestMarshallableSerializer; +import org.apache.ignite.internal.GridJobExecuteRequestSerializer; import org.apache.ignite.internal.GridJobExecuteResponse; import org.apache.ignite.internal.GridJobExecuteResponseSerializer; import org.apache.ignite.internal.GridJobSiblingsRequest; @@ -396,8 +396,7 @@ public GridIoMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarsh factory.register(TcpCommunicationSpi.HANDSHAKE_MSG_TYPE, HandshakeMessage::new, new HandshakeMessageSerializer()); factory.register(TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE, HandshakeWaitMessage::new, new HandshakeWaitMessageSerializer()); factory.register((short)0, GridJobCancelRequest::new, new GridJobCancelRequestSerializer()); - factory.register((short)1, GridJobExecuteRequest::new, - new GridJobExecuteRequestMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); + factory.register((short)1, GridJobExecuteRequest::new, new GridJobExecuteRequestSerializer()); factory.register((short)2, GridJobExecuteResponse::new, new GridJobExecuteResponseSerializer()); factory.register((short)3, GridJobSiblingsRequest::new, new GridJobSiblingsRequestSerializer()); factory.register((short)4, GridJobSiblingsResponse::new, new GridJobSiblingsResponseSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index 98c43ca473a47..fd1b0122d6dae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -1257,31 +1257,49 @@ public void processJobExecuteRequest(ClusterNode node, final GridJobExecuteReque GridJobSessionImpl jobSes; GridJobContextImpl jobCtx; - // Note that we unmarshal session/job attributes here with proper class loader. - GridTaskSessionImpl taskSes = ctx.session().createTaskSession( - req.sessionId(), - node.id(), - req.taskName(), - dep, - req.taskClassName(), - req.topology(), - req.getTopologyPredicate(), - req.startTaskTime(), - endTime, - req.getSiblings(), - req.getSessionAttributes(), - req.sessionFullSupport(), - req.internal(), - req.executorName(), - ctx.security().securityContext() - ); + boolean loc = ctx.localNodeId().equals(node.id()) && !ctx.config().isMarshalLocalJobs(); - taskSes.setCheckpointSpi(req.checkpointSpi()); - taskSes.setClassLoader(dep.classLoader()); + try { + if (!loc) + req.finishUnmarshal(marsh, U.resolveClassLoader(dep.classLoader(), ctx.config())); + + // Note that we unmarshal session/job attributes here with proper class loader. + GridTaskSessionImpl taskSes = ctx.session().createTaskSession( + req.sessionId(), + node.id(), + req.taskName(), + dep, + req.taskClassName(), + req.topology(), + req.getTopologyPredicate(), + req.startTaskTime(), + endTime, + req.getSiblings(), + req.getSessionAttributes(), + req.sessionFullSupport(), + req.internal(), + req.executorName(), + ctx.security().securityContext() + ); + + taskSes.setCheckpointSpi(req.checkpointSpi()); + taskSes.setClassLoader(dep.classLoader()); + + jobSes = new GridJobSessionImpl(ctx, taskSes, req.jobId()); + + jobCtx = new GridJobContextImpl(ctx, req.jobId(), req.getJobAttributes()); + } + catch (IgniteCheckedException e) { + IgniteException ex = new IgniteException("Failed to deserialize task attributes " + + "[taskName=" + req.taskName() + ", taskClsName=" + req.taskClassName() + + ", codeVer=" + req.userVersion() + ", taskClsLdr=" + dep.classLoader() + ']', e); + + U.error(log, ex.getMessage(), e); - jobSes = new GridJobSessionImpl(ctx, taskSes, req.jobId()); + handleException(node, req, ex, endTime); - jobCtx = new GridJobContextImpl(ctx, req.jobId(), req.getJobAttributes()); + return; + } job = new GridJobWorker( ctx, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index b62c7e1672e83..16c91b86edf53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -1416,6 +1416,8 @@ private void sendRequest(ComputeJobResult res) { if (loc) ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req); else { + req.prepareMarshal(marsh); + byte plc; if (internal)