diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 446546b8962e3..e6816345a54e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1204,8 +1204,6 @@ private void onChannelOpened0(UUID rmtNodeId, GridIoMessage initMsg, Channel cha if (topicOrd >= 0) initMsg.topic(GridTopic.fromOrdinal(topicOrd)); - else - initMsg.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config())); } byte plc = initMsg.policy(); @@ -1253,8 +1251,6 @@ private void onMessage0(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) { if (topicOrd >= 0) msg.topic(GridTopic.fromOrdinal(topicOrd)); - else - msg.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config())); } if (!started) { @@ -1980,9 +1976,6 @@ private IgniteInternalFuture openChannel( ); try { - if (topicOrd < 0) - ioMsg.prepareMarshal(marsh); - return ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).openChannel(node, ioMsg); } catch (IgniteSpiException e) { @@ -2054,9 +2047,6 @@ else if (async) ackC.apply(null); } else { - if (topicOrd < 0) - ioMsg.prepareMarshal(marsh); - try { if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi) getTcpCommunicationSpi().sendMessage(node, ioMsg, ackC); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java index b42b8df962a7e..31e5649bddce4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java @@ -27,13 +27,14 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** * Wrapper for all grid messages. */ -public class GridIoMessage implements Message, SpanTransport { +public class GridIoMessage implements MarshallableMessage, SpanTransport { /** */ public static final Integer STRIPE_DISABLED_PART = Integer.MIN_VALUE; @@ -216,19 +217,14 @@ public int partition() { return null; } - /** - * @param marsh Marshaller. - */ - public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { if (topic != null && topicBytes == null) topicBytes = U.marshal(marsh, topic); } - /** - * @param marsh Marshaller. - * @param ldr Class loader. - */ - public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { if (topicBytes != null && topic == null) { topic = U.unmarshal(marsh, topicBytes, ldr); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index eb022f0edb1bd..3922c3b861a6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -54,7 +54,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBeanSerializer; import org.apache.ignite.internal.managers.deployment.GridDeploymentRequest; -import org.apache.ignite.internal.managers.deployment.GridDeploymentRequestSerializer; +import org.apache.ignite.internal.managers.deployment.GridDeploymentRequestMarshallableSerializer; import org.apache.ignite.internal.managers.deployment.GridDeploymentResponse; import org.apache.ignite.internal.managers.deployment.GridDeploymentResponseSerializer; import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest; @@ -62,7 +62,7 @@ import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse; import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponseSerializer; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage; -import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessageSerializer; +import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessageMarshallableSerializer; import org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessage; import org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessageSerializer; import org.apache.ignite.internal.processors.authentication.UserAuthenticateResponseMessage; @@ -403,12 +403,15 @@ public GridIoMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarsh factory.register((short)5, GridTaskCancelRequest::new, new GridTaskCancelRequestSerializer()); factory.register((short)6, GridTaskSessionRequest::new, new GridTaskSessionRequestSerializer()); factory.register((short)7, GridCheckpointRequest::new, new GridCheckpointRequestSerializer()); - factory.register((short)8, GridIoMessage::new, new GridIoMessageSerializer()); + factory.register((short)8, GridIoMessage::new, + new GridIoMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); factory.register((short)9, GridIoUserMessage::new, new GridIoUserMessageSerializer()); factory.register((short)10, GridDeploymentInfoBean::new, new GridDeploymentInfoBeanSerializer()); - factory.register((short)11, GridDeploymentRequest::new, new GridDeploymentRequestSerializer()); + factory.register((short)11, GridDeploymentRequest::new, + new GridDeploymentRequestMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); factory.register((short)12, GridDeploymentResponse::new, new GridDeploymentResponseSerializer()); - factory.register((short)13, GridEventStorageMessage::new, new GridEventStorageMessageSerializer()); + factory.register((short)13, GridEventStorageMessage::new, + new GridEventStorageMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); factory.register((short)16, GridCacheTxRecoveryRequest::new, new GridCacheTxRecoveryRequestSerializer()); factory.register((short)17, GridCacheTxRecoveryResponse::new, new GridCacheTxRecoveryResponseSerializer()); factory.register((short)18, IndexQueryResultMeta::new, new IndexQueryResultMetaSerializer()); @@ -515,7 +518,8 @@ public GridIoMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarsh factory.register((short)169, ServiceSingleNodeDeploymentResult::new, new ServiceSingleNodeDeploymentResultSerializer()); factory.register(GridQueryKillRequest.TYPE_CODE, GridQueryKillRequest::new, new GridQueryKillRequestSerializer()); factory.register(GridQueryKillResponse.TYPE_CODE, GridQueryKillResponse::new, new GridQueryKillResponseSerializer()); - factory.register(GridIoSecurityAwareMessage.TYPE_CODE, GridIoSecurityAwareMessage::new, new GridIoSecurityAwareMessageSerializer()); + factory.register(GridIoSecurityAwareMessage.TYPE_CODE, GridIoSecurityAwareMessage::new, + new GridIoSecurityAwareMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr)); factory.register(SessionChannelMessage.TYPE_CODE, SessionChannelMessage::new, new SessionChannelMessageSerializer()); factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new); factory.register((short)177, TcpInverseConnectionResponseMessage::new, new TcpInverseConnectionResponseMessageSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java index 6de75ab20899c..899075f44487a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java @@ -41,7 +41,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteNotPeerDeployable; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; @@ -72,9 +71,6 @@ class GridDeploymentCommunication { /** */ private final GridBusyLock busyLock = new GridBusyLock(); - /** */ - private final Marshaller marsh; - /** * Creates new instance of deployment communication. * @@ -92,8 +88,6 @@ class GridDeploymentCommunication { processDeploymentRequest(nodeId, msg); } }; - - marsh = ctx.marshaller(); } /** @@ -186,18 +180,6 @@ private void processResourceRequest(UUID nodeId, GridDeploymentRequest req) { if (log.isDebugEnabled()) log.debug("Received peer class/resource loading request [originatingNodeId=" + nodeId + ", req=" + req + ']'); - if (req.responseTopic() == null) { - try { - req.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config())); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process deployment request (will ignore) [" + - "originatingNodeId=" + nodeId + ", req=" + req + ']', e); - - return; - } - } - GridDeploymentResponse res = new GridDeploymentResponse(); GridDeployment dep = ctx.deploy().getDeployment(req.classLoaderId()); @@ -417,9 +399,6 @@ GridDeploymentResponse sendResourceRequest(final String rsrcName, IgniteUuid cls long start = U.currentTimeMillis(); - if (req.responseTopic() != null && !ctx.localNodeId().equals(dstNode.id())) - req.prepareMarshal(marsh); - ctx.io().sendToGridTopic(dstNode, TOPIC_CLASSLOAD, req, GridIoPolicy.P2P_POOL); if (log.isDebugEnabled()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentRequest.java index 00890882e0590..ac9e8fe499196 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentRequest.java @@ -26,12 +26,12 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; /** * Deployment request. */ -public class GridDeploymentRequest implements Message { +public class GridDeploymentRequest implements MarshallableMessage { /** Response topic. Response should be sent back to this topic. */ private Object resTopic; @@ -134,19 +134,14 @@ public void nodeIds(Collection nodeIds) { this.nodeIds = nodeIds; } - /** - * @param marsh Marshaller. - */ - public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { if (resTopic != null && resTopicBytes == null) resTopicBytes = U.marshal(marsh, resTopic); } - /** - * @param marsh Marshaller. - * @param ldr Class loader. - */ - public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { if (resTopicBytes != null && resTopic == null) { resTopic = U.unmarshal(marsh, resTopicBytes, ldr); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index fcfa3e5b37d36..38a773de2bb4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -1025,15 +1025,6 @@ private List query(IgnitePredicate p, Collection nodes, GridTopic topi if (locNode != null) ctx.io().sendToGridTopic(locNode, topic, msg, plc); - if (!rmtNodes.isEmpty()) { - msg.prepareMarshal(marsh); - + if (!rmtNodes.isEmpty()) ctx.io().sendToGridTopic(rmtNodes, topic, msg, plc); - } } /** @@ -1216,7 +1204,7 @@ private class RequestListener implements GridMessageListener { throw new IgniteDeploymentCheckedException("Failed to obtain deployment for event filter " + "(is peer class loading turned on?): " + req); - req.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config()), U.resolveClassLoader(dep.classLoader(), ctx.config())); + req.finishUnmarshalFilters(marsh, U.resolveClassLoader(dep.classLoader(), ctx.config())); filter = (IgnitePredicate)req.filter(); @@ -1252,9 +1240,6 @@ private class RequestListener implements GridMessageListener { if (log.isDebugEnabled()) log.debug("Sending event query response to node [nodeId=" + nodeId + "res=" + res + ']'); - if (!ctx.localNodeId().equals(nodeId)) - res.prepareMarshal(marsh); - ctx.io().sendToCustomTopic(node, req.responseTopic(), res, PUBLIC_POOL); } catch (ClusterTopologyCheckedException e) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java index dade283bc0d1f..0f38f3fc18eba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java @@ -32,13 +32,13 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.jetbrains.annotations.Nullable; /** * Event storage message. */ -public class GridEventStorageMessage implements Message { +public class GridEventStorageMessage implements MarshallableMessage { /** */ private Object resTopic; @@ -205,10 +205,8 @@ public void loaderParticipants(@Nullable Map ldrParties) { return ErrorMessage.error(errMsg); } - /** - * @param marsh Marshaller. - */ - public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { if (resTopic != null && resTopicBytes == null) resTopicBytes = U.marshal(marsh, resTopic); @@ -219,24 +217,14 @@ public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { evtsBytes = U.marshal(marsh, evts); } - /** - * @param marsh Marshaller. - * @param ldr Class loader. - * @param filterClsLdr Class loader for filter. - */ - public void finishUnmarshal(Marshaller marsh, ClassLoader ldr, ClassLoader filterClsLdr) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { if (resTopicBytes != null && resTopic == null) { resTopic = U.unmarshal(marsh, resTopicBytes, ldr); resTopicBytes = null; } - if (filterBytes != null && filter == null && filterClsLdr != null) { - filter = U.unmarshal(marsh, filterBytes, filterClsLdr); - - filterBytes = null; - } - if (evtsBytes != null && evts == null) { evts = U.unmarshal(marsh, evtsBytes, ldr); @@ -244,6 +232,18 @@ public void finishUnmarshal(Marshaller marsh, ClassLoader ldr, ClassLoader filte } } + /** + * @param marsh Marshaller. + * @param filterClsLdr Class loader for filter. + */ + public void finishUnmarshalFilters(Marshaller marsh, ClassLoader filterClsLdr) throws IgniteCheckedException { + if (filterBytes != null && filter == null) { + filter = U.unmarshal(marsh, filterBytes, filterClsLdr); + + filterBytes = null; + } + } + /** {@inheritDoc} */ @Override public short directType() { return 13; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/DeploymentRequestOfUnknownClassProcessingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/DeploymentRequestOfUnknownClassProcessingTest.java index 356e81477b3de..c338150a95acb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/DeploymentRequestOfUnknownClassProcessingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/DeploymentRequestOfUnknownClassProcessingTest.java @@ -127,8 +127,6 @@ public void testResponseReceivingOnDeploymentRequestOfUnknownClass() throws Exce GridDeploymentRequest req = new GridDeploymentRequest(TEST_TOPIC_NAME, locDep.classLoaderId(), UNKNOWN_CLASS_NAME, false); - req.prepareMarshal(locNode.context().marshaller()); - locNode.context().io().sendToGridTopic(remNode.localNode(), TOPIC_CLASSLOAD, req, GridIoPolicy.P2P_POOL); // Сhecks that the expected response has been received. diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java index 70db1390a1698..2cc6e5b9dee9e 100644 --- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java @@ -122,18 +122,6 @@ private void processTestClassLoaderHotRedeployment(DeploymentMode depMode) throw assert taskCls1.getClassLoader() != taskCls2.getClassLoader(); assert taskCls1 != taskCls2; -// final AtomicBoolean undeployed = new AtomicBoolean(false); -// -// grid2.events().localListen(new GridLocalEventListener() { -// @Override public void onEvent(GridEvent evt) { -// if (evt.type() == EVT_TASK_UNDEPLOYED) { -// assert ((GridDeploymentEvent)evt).alias().equals(TASK_NAME); -// -// undeployed.set(true); -// } -// } -// }, EVT_TASK_UNDEPLOYED); - ignite2.compute().localDeployTask(taskCls1, taskCls1.getClassLoader()); Integer res1 = ignite1.compute().execute(taskCls1, Collections.singletonList(ignite2.cluster().localNode().id())); @@ -149,10 +137,6 @@ private void processTestClassLoaderHotRedeployment(DeploymentMode depMode) throw info("Result2: " + res2); assert !res1.equals(res2); - -// Thread.sleep(P2P_TIMEOUT * 2); -// -// assert undeployed.get(); } finally { stopGrid(2);