diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index cc0ab3e112b7d..f6b51fabbcde3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -666,7 +666,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(PartitionHashRecord.class); withNoSchema(TransactionsHashRecord.class); - // [13400 - 13600]: Operation context messages. + // [13400 - 13500]: Operation context messages. msgIdx = 13400; withNoSchema(DistributedOperationContextMessage.class); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 3acf503561ad8..bdbc375fc9ff3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -102,6 +102,7 @@ import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; import org.apache.ignite.internal.processors.tracing.Span; import org.apache.ignite.internal.processors.tracing.SpanTags; +import org.apache.ignite.internal.thread.context.DistributedOperationContextManager; import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; import org.apache.ignite.internal.util.IgniteUtils; @@ -455,10 +456,14 @@ public void resetMetrics() { ioMetric.register(RCVD_BYTES_CNT, spi::getReceivedBytesCount, "Received bytes count."); - getSpi().setListener(commLsnr = new CommunicationListenerEx() { + getSpi().setListener(commLsnr = new CommunicationListenerEx<>() { @Override public void onMessage(UUID nodeId, Object msg, IgniteRunnable msgC) { try { - onMessage0(nodeId, (GridIoMessage)msg, msgC); + GridIoMessage msg0 = (GridIoMessage)msg; + + try (Scope ignored = DistributedOperationContextManager.instance().restoreDistributedAttributes(msg0.opCtxMsg)) { + onMessage0(nodeId, msg0, msgC); + } } catch (ClassCastException ignored) { U.error(log, "Communication manager received message of unknown type (will ignore): " + @@ -2037,16 +2042,22 @@ private long getInverseConnectionWaitTimeout() { long timeout, boolean skipOnTimeout ) { + GridIoMessage res; + if (ctx.security().enabled()) { UUID secSubjId = null; if (!ctx.security().isDefaultContext()) secSubjId = ctx.security().securityContext().subject().id(); - return new GridIoSecurityAwareMessage(secSubjId, plc, topic, msg, ordered, timeout, skipOnTimeout); + res = new GridIoSecurityAwareMessage(secSubjId, plc, topic, msg, ordered, timeout, skipOnTimeout); } + else + res = new GridIoMessage(plc, topic, msg, ordered, timeout, skipOnTimeout); + + res.opCtxMsg = DistributedOperationContextManager.instance().collectDistributedAttributes(); - return new GridIoMessage(plc, topic, msg, ordered, timeout, skipOnTimeout); + return res; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java index 8cc6c106cf22e..cb09122415def 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.managers.communication; +import org.apache.ignite.internal.DistributedOperationContextMessage; import org.apache.ignite.internal.ExecutorAwareMessage; import org.apache.ignite.internal.GridTopicMessage; import org.apache.ignite.internal.Order; @@ -64,6 +65,11 @@ public class GridIoMessage implements Message, SpanTransport { @Order(6) byte[] span; + /** Effective operation context attributes. */ + @Order(7) + @GridToStringInclude + public @Nullable DistributedOperationContextMessage opCtxMsg; + /** * Default constructor. */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 3078e20a0e9fc..f9ab18727eada 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -616,7 +616,7 @@ private void dumpInfo(StringBuilder sb, UUID dstNodeId) { ctxInitLatch, client, igniteExSupplier, - new CommunicationListener() { + new CommunicationListener<>() { @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) { notifyListener(nodeId, msg, msgC); } @@ -651,7 +651,7 @@ private void dumpInfo(StringBuilder sb, UUID dstNodeId) { getWorkersRegistry(ignite), ignite instanceof IgniteEx ? ((IgniteEx)ignite).context().metric() : null, this::createTcpClient, - new CommunicationListenerEx() { + new CommunicationListenerEx<>() { @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) { notifyListener(nodeId, msg, msgC); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java index bdf84b743ad2d..232912c2feb83 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -38,8 +39,12 @@ import java.util.function.Supplier; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.communication.IgniteIoTestMessage; import org.apache.ignite.internal.managers.discovery.CustomEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; @@ -911,6 +916,114 @@ else if (grid(i0).localNode().order() == 1) assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); } + /** */ + @Test + public void testSendAttributesByCommunication() throws Exception { + byte attrId1 = 0; + byte attrId2 = DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1; + + InetSocketAddressMessage dfltDistrAttr1Val = new InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80); + GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1); + + // Local attribute 1. + OperationContextAttribute.newInstance(1000); + + // Distributed attribute 1. + OperationContextAttribute dAttr1 = DistributedOperationContextManager.instance() + .createDistributedAttribute(attrId1, dfltDistrAttr1Val); + + // Local attribute 2. + OperationContextAttribute.newInstance("locaAttr2"); + + // Distributed attribute 2. + OperationContextAttribute dAttr2 = DistributedOperationContextManager.instance() + .createDistributedAttribute(attrId2, dfltDistrAttr2Val); + + startGrids(2); + startClientGrid(2); + + CountDownLatch coordLatch = new CountDownLatch(2); + CountDownLatch srvrLatch = new CountDownLatch(4); + CountDownLatch clientLatch = new CountDownLatch(2); + + InetSocketAddressMessage valToSend1 = new InetSocketAddressMessage(dfltDistrAttr1Val.address(), 443); + GridCacheVersion valToSend2 = new GridCacheVersion(2, 2, 2); + + for (int i = 0; i < G.allGrids().size(); ++i) { + int i0 = i; + + grid(i).context().io().addMessageListener(GridTopic.TOPIC_IO_TEST, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { + if (msg instanceof IgniteIoTestMessage) { + InetSocketAddressMessage receivedVal1 = OperationContext.get(dAttr1); + GridCacheVersion receivedVal2 = OperationContext.get(dAttr2); + + assertNotNull(receivedVal1); + assertNotNull(receivedVal2); + + assertFalse(dfltDistrAttr1Val.port() == receivedVal1.port()); + assertEquals(receivedVal1.port(), valToSend1.port()); + assertEquals(receivedVal1.address(), valToSend1.address()); + + assertEquals(receivedVal2, valToSend2); + + if (grid(i0).localNode().isClient()) + clientLatch.countDown(); + else if (grid(i0).localNode().order() == 1) + coordLatch.countDown(); + else + srvrLatch.countDown(); + } + } + }); + } + + assertFalse(valToSend1.equals(dfltDistrAttr1Val)); + assertFalse(valToSend2.equals(dfltDistrAttr2Val)); + + // From the coordinator to a server. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + io(0).sendIoTest(node(1), null, false); + io(0).sendIoTest(node(1), null, true); + } + + assertTrue(waitForCondition(() -> coordLatch.getCount() == 2, getTestTimeout())); + + // From a server to the coordinator. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + io(1).sendIoTest(node(0), null, false); + io(1).sendIoTest(node(0), null, true); + } + + assertTrue(coordLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + + // From a client to a server. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + io(2).sendIoTest(node(1), null, false); + io(2).sendIoTest(node(1), null, true); + } + + assertTrue(srvrLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + + // From a server to a client. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + io(1).sendIoTest(node(2), null, false); + io(1).sendIoTest(node(2), null, true); + } + + assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + } + + /** @return a {@link ClusterNode} with {@link ClusterNode#isLocal()} == {@code false} to avoid some asserts/checks. */ + private ClusterNode node(int nodeIdx) { + return grid(0).cluster().node(grid(nodeIdx).localNode().id()); + } + + /** */ + private GridIoManager io(int nodeIdx) { + return grid(nodeIdx).context().io(); + } + /** */ private void doContextAwareExecutorServiceTest(ExecutorService pool) throws Exception { CountDownLatch poolUnblockedLatch = blockPool(pool);