From b187ab6eb97da9497d0c58ca3027fb793f0ffccb Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Thu, 23 Apr 2026 16:42:17 +0300 Subject: [PATCH] IGNITE-28594 Use Message DTO for DiscoveryDataBag#JoiningNodeDiscoveryData --- .../ignite/internal/CoreMessagesProvider.java | 19 +++- .../ignite/internal/GridPluginComponent.java | 2 +- .../encryption/GridEncryptionManager.java | 45 +-------- .../encryption/NodeEncryptionKeys.java | 69 ++++++++++++++ .../cache/ValidationOnNodeJoinUtils.java | 2 +- .../cache/binary/BinaryMetadataTransport.java | 20 +--- .../binary/BinaryMetadataVersionInfo.java | 31 +++---- .../binary/BinaryMetadataVersionsData.java | 39 ++++++++ .../CacheObjectBinaryProcessorImpl.java | 17 ++-- .../cluster/GridClusterStateProcessor.java | 37 +------- .../continuous/GridContinuousProcessor.java | 5 +- .../GridMarshallerMappingProcessor.java | 6 +- .../processors/marshaller/MappedName.java | 13 ++- .../marshaller/MarshallerMappingsData.java | 40 ++++++++ .../DistributedMetaStorageImpl.java | 53 ++--------- .../plugin/IgnitePluginProcessor.java | 2 +- .../processors/query/GridQueryProcessor.java | 21 ++--- .../processors/query/InlineSizesData.java | 39 ++++++++ .../service/IgniteServiceProcessor.java | 18 +++- .../spi/discovery/DiscoveryDataBag.java | 30 ++++-- .../ignite/spi/discovery/ObjectData.java | 82 ++++++++++++++++ .../ignite/spi/discovery/tcp/ServerImpl.java | 17 +--- .../spi/discovery/tcp/TcpDiscoverySpi.java | 8 +- .../tcp/internal/DiscoveryDataPacket.java | 93 +++---------------- .../TcpDiscoveryNodeAddedMessage.java | 9 -- .../resources/META-INF/classnames.properties | 2 +- .../DistributedMetaStoragePersistentTest.java | 11 +-- .../zk/internal/DiscoveryMessageParser.java | 13 ++- .../zk/internal/ZkJoiningNodeData.java | 6 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 10 +- 30 files changed, 418 insertions(+), 341 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index f1f7883ea1961..460cb7bcc1043 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse; import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted; import org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequest; +import org.apache.ignite.internal.managers.encryption.NodeEncryptionKeys; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage; import org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider; import org.apache.ignite.internal.processors.authentication.User; @@ -71,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.WalStateFinishMessage; import org.apache.ignite.internal.processors.cache.WalStateProposeMessage; import org.apache.ignite.internal.processors.cache.binary.BinaryMetadataVersionInfo; +import org.apache.ignite.internal.processors.cache.binary.BinaryMetadataVersionsData; import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveAcceptedMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage; @@ -193,15 +195,18 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse; +import org.apache.ignite.internal.processors.marshaller.MappedName; import org.apache.ignite.internal.processors.marshaller.MappingAcceptedMessage; import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage; import org.apache.ignite.internal.processors.marshaller.MarshallerMappingItem; +import org.apache.ignite.internal.processors.marshaller.MarshallerMappingsData; import org.apache.ignite.internal.processors.marshaller.MissingMappingRequestMessage; import org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasAckMessage; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessage; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessage; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessage; +import org.apache.ignite.internal.processors.query.InlineSizesData; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; @@ -250,6 +255,7 @@ import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage; import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage; import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; @@ -351,6 +357,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(GridCacheVersion.class); withNoSchema(GridCacheVersionEx.class); withNoSchema(WALPointer.class); + withNoSchemaResolvedClassLoader(ObjectData.class); // [5700 - 5900]: Discovery originated messages. msgIdx = 5700; @@ -572,6 +579,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(StatisticsResponse.class); withNoSchema(CacheContinuousQueryBatchAck.class); withSchema(CacheContinuousQueryEntry.class); + withNoSchema(InlineSizesData.class); // [11200 - 11300]: Compute, distributed process messages. msgIdx = 11200; @@ -636,7 +644,10 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(MetadataRequestMessage.class); withNoSchema(MetadataResponseMessage.class); withNoSchema(MarshallerMappingItem.class); - withNoSchema(BinaryMetadataVersionInfo.class); + withSchemaResolvedClassLoader(BinaryMetadataVersionInfo.class); + withNoSchema(BinaryMetadataVersionsData.class); + withNoSchema(MappedName.class); + withNoSchema(MarshallerMappingsData.class); // [12400 - 12500]: Encryption messages. msgIdx = 12400; @@ -645,6 +656,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(ChangeCacheEncryptionRequest.class); withNoSchema(MasterKeyChangeRequest.class); withNoSchema(GroupKeyEncrypted.class); + withNoSchema(NodeEncryptionKeys.class); // [13000 - 13300]: Control, configuration, diagnostincs and other messages. msgIdx = 13000; @@ -677,6 +689,11 @@ private void withNoSchemaResolvedClassLoader(Class cls) { register(cls, dfltMarsh, resolvedClsLdr); } + /** Registers message using {@link #schemaAwareMarsh} and {@link #resolvedClsLdr}. */ + private void withSchemaResolvedClassLoader(Class cls) { + register(cls, schemaAwareMarsh, resolvedClsLdr); + } + /** Registers message using incrementing {@link #msgIdx} as the message id/type. */ private void register(Class cls, Marshaller marsh, ClassLoader clsLrd) { register(factory, cls, msgIdx++, marsh, clsLrd); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java index 56f292f407b4d..f0e1a7c7b627a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java @@ -115,7 +115,7 @@ public PluginProvider plugin() { @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node, JoiningNodeDiscoveryData discoData) { try { - Map map = (Map)discoData.joiningNodeData(); + Map map = discoData.joiningNodeData(); if (map != null) plugin.validateNewNode(node, map.get(plugin.name())); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java index e20b1effd18bc..19c58d5d7b8cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java @@ -431,9 +431,9 @@ public void onLocalJoin() { "Cache group key change is in progress! Node join is rejected."); } - NodeEncryptionKeys nodeEncKeys = (NodeEncryptionKeys)discoData.joiningNodeData(); + NodeEncryptionKeys nodeEncKeys = discoData.joiningNodeData(); - if (!discoData.hasJoiningNodeData() || nodeEncKeys == null) { + if (nodeEncKeys == null) { return new IgniteNodeValidationResult(ctx.localNodeId(), "Joining node doesn't have encryption data [node=" + node.id() + "]", "Joining node doesn't have encryption data."); @@ -522,7 +522,7 @@ public void onLocalJoin() { /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { - NodeEncryptionKeys nodeEncryptionKeys = (NodeEncryptionKeys)data.joiningNodeData(); + NodeEncryptionKeys nodeEncryptionKeys = data.joiningNodeData(); if (nodeEncryptionKeys == null || nodeEncryptionKeys.newKeys == null || ctx.clientNode()) return; @@ -1748,45 +1748,6 @@ private String decryptKeyName(byte[] data) { }); } - /** */ - protected static class NodeEncryptionKeys implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - NodeEncryptionKeys( - HashMap> knownKeysWithIds, - Map newKeys, - byte[] masterKeyDigest - ) { - this.newKeys = newKeys; - this.masterKeyDigest = masterKeyDigest; - - if (F.isEmpty(knownKeysWithIds)) - return; - - // To be able to join the old cluster. - knownKeys = U.newHashMap(knownKeysWithIds.size()); - - for (Map.Entry> entry : knownKeysWithIds.entrySet()) - knownKeys.put(entry.getKey(), entry.getValue().get(0).key()); - - this.knownKeysWithIds = knownKeysWithIds; - } - - /** Known i.e. stored in {@code ReadWriteMetastorage} keys from node (in compatible format). */ - Map knownKeys; - - /** New keys i.e. keys for a local statically configured caches. */ - Map newKeys; - - /** Master key digest. */ - byte[] masterKeyDigest; - - /** Known i.e. stored in {@code ReadWriteMetastorage} keys from node. */ - Map> knownKeysWithIds; - } - /** */ private class GenerateEncryptionKeyFuture extends GridFutureAdapter, byte[]>> { /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java new file mode 100644 index 0000000000000..c2583de3b04fe --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.encryption; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** */ +public class NodeEncryptionKeys implements Message { + /** Known i.e. stored in {@code ReadWriteMetastorage} keys from node (in compatible format). */ + @Order(0) + Map knownKeys; + + /** New keys i.e. keys for a local statically configured caches. */ + @Order(1) + Map newKeys; + + /** Master key digest. */ + @Order(2) + byte[] masterKeyDigest; + + /** Known i.e. stored in {@code ReadWriteMetastorage} keys from node. */ + @Order(3) + Map> knownKeysWithIds; + + /** */ + public NodeEncryptionKeys() {} + + /** */ + NodeEncryptionKeys( + HashMap> knownKeysWithIds, + Map newKeys, + byte[] masterKeyDigest + ) { + this.newKeys = newKeys; + this.masterKeyDigest = masterKeyDigest; + + if (F.isEmpty(knownKeysWithIds)) + return; + + // To be able to join the old cluster. + knownKeys = U.newHashMap(knownKeysWithIds.size()); + + for (Map.Entry> entry : knownKeysWithIds.entrySet()) + knownKeys.put(entry.getKey(), entry.getValue().get(0).key()); + + this.knownKeysWithIds = knownKeysWithIds; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java index 4f72dc259d8a8..ac8d97b2a127c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java @@ -124,7 +124,7 @@ public class ValidationOnNodeJoinUtils { Function cacheDescProvider ) { if (discoData.hasJoiningNodeData() && discoData.joiningNodeData() instanceof CacheJoinNodeDiscoveryData) { - CacheJoinNodeDiscoveryData nodeData = (CacheJoinNodeDiscoveryData)discoData.joiningNodeData(); + CacheJoinNodeDiscoveryData nodeData = discoData.joiningNodeData(); boolean isGridActive = ctx.state().clusterState().active(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java index 0cfb2d96768b9..5b36542f1d6c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java @@ -841,15 +841,6 @@ private final class MetadataRequestListener implements GridMessageListener { MetadataResponseMessage resp = new MetadataResponseMessage(typeId); - if (metaVerInfo != null) { - try { - metaVerInfo.marshalMetadata(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal binary metadata for [typeId=" + typeId + ']', e); - } - } - resp.metadataVersionInfo(metaVerInfo); try { @@ -890,16 +881,9 @@ private final class MetadataResponseListener implements GridMessageListener { return; } - try { - metaVerInfo.unmarshalMetadata(); - - casBinaryMetadata(typeId, metaVerInfo); + casBinaryMetadata(typeId, metaVerInfo); - fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1)); - } - catch (IgniteCheckedException e) { - fut.onDone(MetadataUpdateResult.createFailureResult(new BinaryObjectException(e))); - } + fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1)); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java index 1c71892fd1795..9fe6d19da4421 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java @@ -18,12 +18,11 @@ import java.io.Serializable; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.MarshallableMessage; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.binary.BinaryMetadata; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.plugin.extensions.communication.Message; - -import static org.apache.ignite.marshaller.Marshallers.jdk; +import org.apache.ignite.marshaller.Marshaller; /** * Wrapper for {@link BinaryMetadata} which is stored in metadata local cache on each node. @@ -31,7 +30,7 @@ * The version refers solely to the internal protocol for updating BinaryMetadata and is unknown externally. * It can be updated dynamically from different nodes and threads on the same node. */ -public final class BinaryMetadataVersionInfo implements Serializable, Message { +public final class BinaryMetadataVersionInfo implements Serializable, MarshallableMessage { /** */ private static final long serialVersionUID = 0L; @@ -130,24 +129,16 @@ boolean removing() { return removing; } - /** - * Marshals binary metadata to byte array. - * - * @throws IgniteCheckedException If failed. - */ - public void marshalMetadata() throws IgniteCheckedException { - if (metadataBytes == null) - metadataBytes = U.marshal(jdk(), metadata); + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (metadata != null) + metadataBytes = U.marshal(marsh, metadata); } - /** - * Unmarshals binary metadata from byte array. - * - * @throws IgniteCheckedException If failed. - */ - public void unmarshalMetadata() throws IgniteCheckedException { - if (metadata == null && metadataBytes != null) { - metadata = U.unmarshal(jdk(), metadataBytes, U.gridClassLoader()); + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + if (metadataBytes != null) { + metadata = U.unmarshal(marsh, metadataBytes, clsLdr); // It is not required anymore. metadataBytes = null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java new file mode 100644 index 0000000000000..706d2c26f1caa --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.binary; + +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** */ +public class BinaryMetadataVersionsData implements Message { + /** */ + @Order(0) + Map data; + + /** */ + public BinaryMetadataVersionsData() {} + + /** + * @param data Data. + */ + public BinaryMetadataVersionsData(Map data) { + this.data = Map.copyOf(data); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index a0682d41b4174..8867e576ec8af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -1394,7 +1394,7 @@ private int partition(CacheObjectContext ctx, @Nullable GridCacheContext cctx, O if ((res = validateBinaryConfiguration(rmtNode)) != null) return res; - return validateBinaryMetadata(rmtNode.id(), (Map)discoData.joiningNodeData()); + return validateBinaryMetadata(rmtNode.id(), discoData.joiningNodeData()); } /** */ @@ -1418,11 +1418,11 @@ private IgniteNodeValidationResult validateBinaryConfiguration(ClusterNode rmtNo } /** */ - private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Map newNodeMeta) { + private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, BinaryMetadataVersionsData newNodeMeta) { if (newNodeMeta == null) return null; - for (Map.Entry metaEntry : newNodeMeta.entrySet()) { + for (Map.Entry metaEntry : newNodeMeta.data.entrySet()) { if (!metadataLocCache.containsKey(metaEntry.getKey())) continue; @@ -1470,24 +1470,19 @@ private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Map res = U.newHashMap(metadataLocCache.size()); - - for (Map.Entry e : metadataLocCache.entrySet()) - res.put(e.getKey(), e.getValue()); - - dataBag.addJoiningNodeData(BINARY_PROC.ordinal(), (Serializable)res); + dataBag.addJoiningNodeData(BINARY_PROC.ordinal(), new BinaryMetadataVersionsData(metadataLocCache)); } /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) { - Map newNodeMeta = (Map)data.joiningNodeData(); + BinaryMetadataVersionsData newNodeMeta = data.joiningNodeData(); if (newNodeMeta == null) return; UUID joiningNode = data.joiningNodeId(); - for (Map.Entry metaEntry : newNodeMeta.entrySet()) { + for (Map.Entry metaEntry : newNodeMeta.data.entrySet()) { if (metadataLocCache.containsKey(metaEntry.getKey())) { BinaryMetadataVersionInfo locMetaVerInfo = metadataLocCache.get(metaEntry.getKey()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 4640faf6d8e35..337b4f51a70c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -925,14 +925,7 @@ protected IgniteCheckedException concurrentStateChangeError(ClusterState state, /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { - try { - byte[] marshalledState = marsh.marshal(globalState); - - dataBag.addJoiningNodeData(discoveryDataType().ordinal(), marshalledState); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + dataBag.addJoiningNodeData(discoveryDataType().ordinal(), globalState); } /** {@inheritDoc} */ @@ -953,20 +946,7 @@ protected IgniteCheckedException concurrentStateChangeError(ClusterState state, return; } - DiscoveryDataClusterState joiningNodeState = null; - - try { - if (joiningNodeData.joiningNodeData() != null) - joiningNodeState = marsh.unmarshal( - (byte[])joiningNodeData.joiningNodeData(), - U.resolveClassLoader(ctx.config()) - ); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal disco data from joining node: " + joiningNodeData.joiningNodeId()); - - return; - } + DiscoveryDataClusterState joiningNodeState = joiningNodeData.joiningNodeData(); BaselineTopologyHistory historyToSend = null; @@ -1251,18 +1231,7 @@ public IgniteInternalFuture changeGlobalState( return null; } - DiscoveryDataClusterState joiningNodeState; - - try { - joiningNodeState = marsh.unmarshal((byte[])discoData.joiningNodeData(), U.resolveClassLoader(ctx.config())); - } - catch (IgniteCheckedException e) { - String msg = "Error on unmarshalling discovery data " + - "from node " + node.consistentId() + ": " + e.getMessage() + - "; node is not allowed to join"; - - return new IgniteNodeValidationResult(node.id(), msg); - } + DiscoveryDataClusterState joiningNodeState = discoData.joiningNodeData(); if (joiningNodeState == null || joiningNodeState.baselineTopology() == null) return null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 7509a88fa178a..30bc8175de3d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -521,8 +521,7 @@ private Map copyLocalInfos(Map l if (immutableDiscoCustomMsg) { if (data.hasJoiningNodeData()) { - ContinuousRoutinesJoiningNodeDiscoveryData nodeData = (ContinuousRoutinesJoiningNodeDiscoveryData) - data.joiningNodeData(); + ContinuousRoutinesJoiningNodeDiscoveryData nodeData = data.joiningNodeData(); for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) { routinesInfo.addRoutineInfo(routineInfo); @@ -533,7 +532,7 @@ private Map copyLocalInfos(Map l } else { if (data.hasJoiningNodeData()) - onDiscoveryDataReceivedMutable((DiscoveryData)data.joiningNodeData()); + onDiscoveryDataReceivedMutable(data.joiningNodeData()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java index f6895734545a5..8946672364edb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java @@ -326,7 +326,7 @@ private final class MappingAcceptedListener implements CustomEventListener> mappings = (List>)data.joiningNodeData(); + MarshallerMappingsData mappingsData = data.joiningNodeData(); - processIncomingMappings(mappings); + processIncomingMappings(mappingsData.mappings); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java index eae07d2d01b8b..4a000fadc231e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java @@ -19,20 +19,27 @@ import java.io.Serializable; import java.util.Objects; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Contains mapped class name and boolean flag showing whether this mapping was accepted by other nodes or not. */ -public final class MappedName implements Serializable { +public final class MappedName implements Serializable, Message { /** */ private static final long serialVersionUID = 0L; /** */ - private final String clsName; + @Order(0) + String clsName; /** */ - private final boolean accepted; + @Order(1) + boolean accepted; + + /** */ + public MappedName() {} /** * @param clsName Class name. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java new file mode 100644 index 0000000000000..2207b1c21f47c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.marshaller; + +import java.util.List; +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** */ +public class MarshallerMappingsData implements Message { + /** */ + @Order(0) + List> mappings; + + /** */ + public MarshallerMappingsData() {} + + /** + * @param mappings Mappings. + */ + public MarshallerMappingsData(List> mappings) { + this.mappings = mappings; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index 2e24715108b7b..6e67b52250e51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -571,14 +571,9 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) { EMPTY_ARRAY ); - try { - dataBag.addJoiningNodeData(COMPONENT_ID, marshaller.marshal(data)); + dataBag.addJoiningNodeData(COMPONENT_ID, data); - return; - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + return; } Serializable data = new DistributedMetaStorageJoiningNodeData( @@ -587,12 +582,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) { histCache.toArray() ); - try { - dataBag.addJoiningNodeData(COMPONENT_ID, marshaller.marshal(data)); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + dataBag.addJoiningNodeData(COMPONENT_ID, data); } finally { lock.readLock().unlock(); @@ -640,10 +630,10 @@ private int getBaselineTopologyId() { try { DistributedMetaStorageVersion locVer = ver; - DistributedMetaStorageJoiningNodeData joiningData = getJoiningNodeData(discoData); + DistributedMetaStorageJoiningNodeData joiningData = discoData.joiningNodeData(); if (joiningData == null) { - String errorMsg = "Cannot unmarshal joining node data"; + String errorMsg = "Empty joining node data"; return new IgniteNodeValidationResult(node.id(), errorMsg); } @@ -774,10 +764,7 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData if (!discoData.hasJoiningNodeData()) return; - DistributedMetaStorageJoiningNodeData joiningData = getJoiningNodeData(discoData); - - if (joiningData == null) - return; + DistributedMetaStorageJoiningNodeData joiningData = discoData.joiningNodeData(); DistributedMetaStorageVersion remoteVer = joiningData.ver; @@ -832,10 +819,7 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData if (!discoData.hasJoiningNodeData()) return; - DistributedMetaStorageJoiningNodeData joiningData = getJoiningNodeData(discoData); - - if (joiningData == null) - return; + DistributedMetaStorageJoiningNodeData joiningData = discoData.joiningNodeData(); DistributedMetaStorageVersion remoteVer = joiningData.ver; @@ -880,29 +864,6 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData } } - /** - * Retrieve joining node data from discovery data. It is expected that it is present as a {@code byte[]} object. - * - * @param discoData Joining node discovery data. - * @return Unmarshalled data or null if unmarshalling failed. - */ - @Nullable private DistributedMetaStorageJoiningNodeData getJoiningNodeData( - JoiningNodeDiscoveryData discoData - ) { - byte[] data = (byte[])discoData.joiningNodeData(); - - assert data != null; - - try { - return marshaller.unmarshal(data, U.gridClassLoader()); - } - catch (IgniteCheckedException e) { - log.error("Unable to unmarshal joinging node data for distributed metastorage component.", e); - - return null; - } - } - /** {@inheritDoc} */ @Override public void onDisconnected(IgniteFuture reconnectFut) { assert isClient; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java index 6a8fbf2962ebb..59bcabd2bfa6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java @@ -194,7 +194,7 @@ private Serializable getDiscoveryData(UUID joiningNodeId) { /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { if (data.hasJoiningNodeData()) { - Map pluginsData = (Map)data.joiningNodeData(); + Map pluginsData = data.joiningNodeData(); applyPluginsData(data.joiningNodeId(), pluginsData); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 6df09f83c413c..854e8a17e5880 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -496,28 +496,19 @@ public void onCacheReconnect() throws IgniteCheckedException { /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) { - if (data.hasJoiningNodeData() && data.joiningNodeData() instanceof Map) { - Map nodeSpecificDataMap = (Map)data.joiningNodeData(); + Object joiningNodeData = data.joiningNodeData(); - if (nodeSpecificDataMap.containsKey(INLINE_SIZES_DISCO_BAG_KEY)) { - Serializable serializable = nodeSpecificDataMap.get(INLINE_SIZES_DISCO_BAG_KEY); + if (joiningNodeData instanceof InlineSizesData) { + Map joiningNodeIndexesInlineSize = ((InlineSizesData)joiningNodeData).sizes; - assert serializable instanceof Map : serializable; - - Map joiningNodeIndexesInlineSize = (Map)serializable; - - checkInlineSizes(secondaryIndexesInlineSize(), joiningNodeIndexesInlineSize, data.joiningNodeId()); - } + checkInlineSizes(secondaryIndexesInlineSize(), joiningNodeIndexesInlineSize, data.joiningNodeId()); } } /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { - HashMap dataMap = new HashMap<>(); - - dataMap.put(INLINE_SIZES_DISCO_BAG_KEY, collectSecondaryIndexesInlineSize()); - - dataBag.addJoiningNodeData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), dataMap); + dataBag.addJoiningNodeData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), + new InlineSizesData(secondaryIndexesInlineSize())); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java new file mode 100644 index 0000000000000..eb3813501f670 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** */ +public class InlineSizesData implements Message { + /** */ + @Order(0) + Map sizes; + + /** */ + public InlineSizesData() {} + + /** + * @param sizes Inline sizes. + */ + public InlineSizesData(Map sizes) { + this.sizes = sizes; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java index 86fa2f2cea977..e7f59549ea4ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java @@ -418,7 +418,9 @@ private void cancelDeployedServices() { if (data.joiningNodeData() == null) return null; - List svcs = ((ServiceProcessorJoinNodeDiscoveryData)data.joiningNodeData()).services(); + ServiceProcessorJoinNodeDiscoveryData srvcProcData = data.joiningNodeData(); + + List svcs = srvcProcData.services(); if (ctx.security().enabled()) { SecurityException err = checkDeployPermissionDuringJoin(node, svcs); @@ -429,7 +431,10 @@ private void cancelDeployedServices() { for (ServiceInfo svc : svcs) { try { - unmarshalNodeFilterIfNeeded(svc.configuration()); + // Returned value is ignored, because we only need to check possibility of marshalling. + // We don't need to save node filter in lazy configuration at this moment. + // Filter will be unmarhshalled and saved during adding node to topology (see #onGridDataReceived). + unmarshalNodeFilter(svc.configuration()); } catch (IgniteCheckedException e) { return new IgniteNodeValidationResult(node.id(), "Node join is rejected [joiningNodeId=" + node.id() + @@ -445,7 +450,7 @@ private void cancelDeployedServices() { if (data.joiningNodeData() == null) return; - ServiceProcessorJoinNodeDiscoveryData joinData = (ServiceProcessorJoinNodeDiscoveryData)data.joiningNodeData(); + ServiceProcessorJoinNodeDiscoveryData joinData = data.joiningNodeData(); for (ServiceInfo desc : joinData.services()) { assert desc.topologySnapshot().isEmpty(); @@ -1442,12 +1447,17 @@ private void unmarshalNodeFilterIfNeeded(LazyServiceConfiguration cfg) throws Ig if (cfg.getNodeFilter() != null) return; + cfg.setNodeFilter(unmarshalNodeFilter(cfg)); + } + + /** @param cfg Lazy service configuration. */ + private IgnitePredicate unmarshalNodeFilter(LazyServiceConfiguration cfg) throws IgniteCheckedException { GridDeployment dep = ctx.deploy().getDeployment(cfg.serviceClassName()); ClassLoader clsLdr = U.resolveClassLoader(dep != null ? dep.classLoader() : null, ctx.config()); try { - cfg.setNodeFilter(U.unmarshal(marsh, cfg.nodeFilterBytes(), clsLdr)); + return U.unmarshal(marsh, cfg.nodeFilterBytes(), clsLdr); } catch (IgniteCheckedException e) { throw new IgniteCheckedException("Failed to unmarshal class of service node filter [cfg=" + cfg + ']', e); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java index 28fc5f1a7b841..58e41738265d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java @@ -24,6 +24,7 @@ import java.util.UUID; import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** @@ -44,8 +45,11 @@ public interface JoiningNodeDiscoveryData { /** @return Whether joining node provided discovery data. */ boolean hasJoiningNodeData(); - /** @return Joining node data. */ - Serializable joiningNodeData(); + /** + * @param Data type. + * @return Joining node data. + */ + T joiningNodeData(); } /** @@ -80,8 +84,10 @@ private final class JoiningNodeDiscoveryDataImpl implements JoiningNodeDiscovery } /** {@inheritDoc} */ - @Override @Nullable public Serializable joiningNodeData() { - return joiningNodeData.get(cmpId); + @Override @Nullable public T joiningNodeData() { + Message dataMsg = joiningNodeData.get(cmpId); + + return dataMsg instanceof ObjectData ? ObjectData.unwrap(dataMsg) : (T)dataMsg; } /** @@ -158,7 +164,7 @@ private void reinitNodeSpecData(int cmpId) { private Set cmnDataInitializedCmps; /** */ - private Map joiningNodeData = new HashMap<>(); + private Map joiningNodeData = new HashMap<>(); /** */ private Map commonData = new HashMap<>(); @@ -237,9 +243,17 @@ public JoiningNodeDiscoveryData newJoinerDiscoveryData(int cmpId) { /** * @param cmpId Component ID. - * @param data Data. + * @param data Serializable data. */ public void addJoiningNodeData(Integer cmpId, Serializable data) { + joiningNodeData.put(cmpId, new ObjectData(data)); + } + + /** + * @param cmpId Component ID. + * @param data Message data. + */ + public void addJoiningNodeData(Integer cmpId, Message data) { joiningNodeData.put(cmpId, data); } @@ -275,7 +289,7 @@ public boolean commonDataCollectedFor(Integer cmpId) { /** * @param joinNodeData Joining node data. */ - public void joiningNodeData(Map joinNodeData) { + public void joiningNodeData(Map joinNodeData) { joiningNodeData.putAll(joinNodeData); } @@ -294,7 +308,7 @@ public void nodeSpecificData(Map> nodeSpecData) } /** @return Discovery data for each Ignite component that is sent to the cluster nodes by joining node. */ - public Map joiningNodeData() { + public Map joiningNodeData() { return joiningNodeData; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java new file mode 100644 index 0000000000000..f9da59bffe415 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery; + +import java.io.Serializable; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.MarshallableMessage; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** Wrapper message for serializable data. */ +public class ObjectData implements MarshallableMessage { + /** */ + @GridToStringInclude + private Serializable data; + + /** */ + @GridToStringExclude + @Order(0) + byte[] dataBytes; + + /** */ + public ObjectData() {} + + /** + * @param data Original data. + */ + public ObjectData(Serializable data) { + this.data = data; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (data != null) + dataBytes = U.marshal(marsh, data); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + if (dataBytes != null) { + data = U.unmarshal(marsh, dataBytes, clsLdr); + + dataBytes = null; + } + } + + /** + * @param msg Message. + * @param Type of data. + * + * @return Original data unwrapped from a message. + */ + public static T unwrap(@Nullable Message msg) { + return msg != null ? (T)(((ObjectData)msg).data) : null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ObjectData.class, this); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 2d2480f01f3cb..69447b686e4d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1917,7 +1917,6 @@ private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) { nodeAddedMsg.topology(null); nodeAddedMsg.topologyHistory(null); nodeAddedMsg.messages(null); - nodeAddedMsg.clearUnmarshalledDiscoveryData(); } } @@ -4338,19 +4337,9 @@ else if (log.isDebugEnabled()) err = spi.getSpiContext().validateNode(node); if (err == null) { - try { - DiscoveryDataBag data = msg.gridDiscoveryData().unmarshalJoiningNodeData( - spi.marshaller(), - U.resolveClassLoader(spi.ignite().configuration()), - false, - log - ); - - err = spi.getSpiContext().validateNode(node, data); - } - catch (IgniteCheckedException e) { - err = new IgniteNodeValidationResult(node.id(), e.getMessage()); - } + DiscoveryDataBag data = msg.gridDiscoveryData().bagWithJoiningNodeData(); + + err = spi.getSpiContext().validateNode(node, data); } if (err != null) { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 2ae75539a1c50..0ca3130575ef7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -2075,11 +2075,7 @@ DiscoveryDataPacket collectExchangeData(DiscoveryDataPacket dataPacket) { //marshall collected bag into packet, return packet if (dataPacket.joiningNodeId().equals(locNode.id())) - dataPacket.marshalJoiningNodeData( - dataBag, - marshaller(), - ignite.configuration().getNetworkCompressionLevel(), - log); + dataPacket.addJoiningNodeData(dataBag); else dataPacket.marshalGridNodeData( dataBag, @@ -2116,7 +2112,7 @@ protected void onExchange(DiscoveryDataPacket dataPacket, ClassLoader clsLdr) { } } else - dataBag = dataPacket.unmarshalJoiningNodeDataSilently(marshaller(), clsLdr, locNode.clientRouterNodeId() != null, log); + dataBag = dataPacket.bagWithJoiningNodeData(); exchange.onExchange(dataBag); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java index 4922e7c0e86c3..4ded165df7082 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java @@ -24,10 +24,11 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.Compress; import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; @@ -53,10 +54,8 @@ public class DiscoveryDataPacket implements Serializable, Message { /** */ @Order(1) - Map joiningNodeData = new HashMap<>(); - - /** */ - private transient Map unmarshalledJoiningNodeData; + @Compress + Map joiningNodeData = new HashMap<>(); /** */ @Order(2) @@ -114,12 +113,10 @@ public void marshalGridNodeData(DiscoveryDataBag bag, UUID nodeId, Marshaller ma /** * @param bag Bag. - * @param marsh Marsh. - * @param log Logger. */ - public void marshalJoiningNodeData(DiscoveryDataBag bag, Marshaller marsh, - int compressionLevel, IgniteLogger log) { - marshalData(bag.joiningNodeData(), joiningNodeData, marsh, compressionLevel, log); + public void addJoiningNodeData(DiscoveryDataBag bag) { + if (!F.isEmpty(bag.joiningNodeData())) + joiningNodeData.putAll(bag.joiningNodeData()); } /** @@ -161,67 +158,13 @@ public DiscoveryDataBag unmarshalGridData( } /** - * @param marsh Marsh. - * @param clsLdr Class loader. - * @param clientNode Client node. - * @param log Logger. - * @throws IgniteCheckedException If unmarshalling failed. + * @return Data bag with joining node data. */ - public DiscoveryDataBag unmarshalJoiningNodeData( - Marshaller marsh, - ClassLoader clsLdr, - boolean clientNode, - IgniteLogger log - ) throws IgniteCheckedException { - return unmarshalJoiningNodeData(marsh, clsLdr, clientNode, log, true); - } - - /** - * @param marsh Marsh. - * @param clsLdr Class loader. - * @param clientNode Client node. - * @param log Logger. - */ - public DiscoveryDataBag unmarshalJoiningNodeDataSilently( - Marshaller marsh, - ClassLoader clsLdr, - boolean clientNode, - IgniteLogger log - ) { - try { - return unmarshalJoiningNodeData(marsh, clsLdr, clientNode, log, false); - } - catch (IgniteCheckedException impossible) { - assert false : impossible; - - log.error("Failed to unmarshal joining node data", impossible); - - throw new IgniteException(impossible); - } - } - - /** - * @param marsh Marsh. - * @param clsLdr Class loader. - * @param clientNode Client node. - * @param log Logger. - * @param panic Throw unmarshalling if {@code true}. - * @throws IgniteCheckedException If {@code panic} is {@code true} and unmarshalling failed. - */ - private DiscoveryDataBag unmarshalJoiningNodeData( - Marshaller marsh, - ClassLoader clsLdr, - boolean clientNode, - IgniteLogger log, - boolean panic - ) throws IgniteCheckedException { + public DiscoveryDataBag bagWithJoiningNodeData() { DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId, joiningNodeClient); - if (joiningNodeData != null && !joiningNodeData.isEmpty()) { - unmarshalledJoiningNodeData = unmarshalData(joiningNodeData, marsh, clsLdr, clientNode, log, panic); - - dataBag.joiningNodeData(unmarshalledJoiningNodeData); - } + if (!F.isEmpty(joiningNodeData)) + dataBag.joiningNodeData(joiningNodeData); return dataBag; } @@ -230,7 +173,7 @@ private DiscoveryDataBag unmarshalJoiningNodeData( * */ public boolean hasJoiningNodeData() { - return joiningNodeData != null && !joiningNodeData.isEmpty(); + return !F.isEmpty(joiningNodeData); } /** @@ -443,8 +386,8 @@ private void filterDuplicatedData(Map discoData) { public DiscoveryDataBag bagForDataCollection() { DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId, commonData.keySet(), joiningNodeClient); - if (unmarshalledJoiningNodeData != null) - dataBag.joiningNodeData(unmarshalledJoiningNodeData); + if (joiningNodeData != null) + dataBag.joiningNodeData(joiningNodeData); return dataBag; } @@ -455,12 +398,4 @@ public DiscoveryDataBag bagForDataCollection() { public void joiningNodeClient(boolean joiningNodeClient) { this.joiningNodeClient = joiningNodeClient; } - - /** - * Clears {@link #unmarshalledJoiningNodeData} - */ - public void clearUnmarshalledJoiningNodeData() { - unmarshalledJoiningNodeData = null; - } - } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index ed717fcbbc070..b810ac6b32220 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -195,15 +195,6 @@ public void clearDiscoveryData() { dataPacket = null; } - /** - * Clears unmarshalled discovery data to minimize message size. - * These data are used only on "collect" stage and are not part of persistent state. - */ - public void clearUnmarshalledDiscoveryData() { - if (dataPacket != null) - dataPacket.clearUnmarshalledJoiningNodeData(); - } - /** @return First grid node start time. */ public long gridStartTime() { return gridStartTime; diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 954755b375ad7..f6acd55898484 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -737,7 +737,7 @@ org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse org.apache.ignite.internal.managers.encryption.GridEncryptionManager$EmptyResult org.apache.ignite.internal.managers.encryption.GridEncryptionManager$MasterKeyChangeRequest -org.apache.ignite.internal.managers.encryption.GridEncryptionManager$NodeEncryptionKeys +org.apache.ignite.internal.managers.encryption.NodeEncryptionKeys org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage org.apache.ignite.internal.managers.indexing.GridIndexingManager$1 diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java index 965e75e26cb8d..120c347932635 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.metastorage; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -26,7 +25,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl; -import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; @@ -537,7 +536,7 @@ public void testVerFromDiscoveryClusterData() throws Exception { DiscoverySpiDataExchange exchange = GridTestUtils.getFieldValue(spi, TcpDiscoverySpi.class, "exchange"); - List> dataBags = new ArrayList<>(); + List> dataBags = new ArrayList<>(); spi.setDataExchange(new DiscoverySpiDataExchange() { @Override public DiscoveryDataBag collect(DiscoveryDataBag dataBag) { @@ -555,11 +554,9 @@ public void testVerFromDiscoveryClusterData() throws Exception { assertEquals(1, dataBags.size()); - byte[] joiningNodeDataMarshalled = (byte[])dataBags.get(0).get(META_STORAGE.ordinal()); + Object joiningNodeData = dataBags.get(0).get(META_STORAGE.ordinal()); - assertNotNull(joiningNodeDataMarshalled); - - Object joiningNodeData = TEST_JDK_MARSHALLER.unmarshal(joiningNodeDataMarshalled, U.gridClassLoader()); + assertNotNull(joiningNodeData); Object[] hist = GridTestUtils.getFieldValue(joiningNodeData, "hist"); diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java index 6512c2ac9a42b..d29a73f7039f4 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java @@ -31,7 +31,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageSerializer; import org.apache.ignite.spi.IgniteSpiException; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType; @@ -51,11 +50,11 @@ public DiscoveryMessageParser(MessageFactory msgFactory) { } /** Marshals discovery message to bytes array. */ - public byte[] marshalZip(DiscoverySpiCustomMessage msg) { + public byte[] marshalZip(Message msg) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (DeflaterOutputStream out = new DeflaterOutputStream(baos)) { - serializeMessage((Message)msg, out); + serializeMessage(msg, out); } catch (Exception e) { throw new IgniteSpiException("Failed to serialize message: " + msg, e); @@ -65,12 +64,12 @@ public byte[] marshalZip(DiscoverySpiCustomMessage msg) { } /** Unmarshals discovery message from bytes array. */ - public DiscoverySpiCustomMessage unmarshalZip(byte[] bytes) { + public T unmarshalZip(byte[] bytes) { try ( ByteArrayInputStream bais = new ByteArrayInputStream(bytes); InflaterInputStream in = new InflaterInputStream(bais) ) { - return (DiscoverySpiCustomMessage)deserializeMessage(in); + return deserializeMessage(in); } catch (Exception e) { throw new IgniteSpiException("Failed to deserialize message.", e); @@ -99,7 +98,7 @@ private void serializeMessage(Message m, OutputStream out) throws IOException { } /** */ - private Message deserializeMessage(InputStream in) throws IOException { + private T deserializeMessage(InputStream in) throws IOException { DirectMessageReader msgReader = new DirectMessageReader(msgFactory, null); ByteBuffer msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); @@ -127,6 +126,6 @@ private Message deserializeMessage(InputStream in) throws IOException { } while (!finished); - return msg; + return (T)msg; } } diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java index ff8311d071ba8..f2b5ac463685d 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java @@ -38,7 +38,7 @@ class ZkJoiningNodeData implements Serializable { /** */ @GridToStringInclude - private Map discoData; + private Map discoData; /** * @param partCnt Number of parts in multi-parts message. @@ -51,7 +51,7 @@ class ZkJoiningNodeData implements Serializable { * @param node Node. * @param discoData Discovery data. */ - ZkJoiningNodeData(ZookeeperClusterNode node, Map discoData) { + ZkJoiningNodeData(ZookeeperClusterNode node, Map discoData) { assert node != null && node.id() != null : node; assert discoData != null; @@ -76,7 +76,7 @@ ZookeeperClusterNode node() { /** * @return Discovery data. */ - Map discoveryData() { + Map discoveryData() { return discoData; } diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 7abe3ddf1ed20..0d0531d1a9a2c 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -69,6 +69,7 @@ import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; @@ -804,7 +805,8 @@ private void joinTopology(@Nullable ZkRuntimeState prevState) throws Interrupted exchange.collect(discoDataBag); - ZkJoiningNodeData joinData = new ZkJoiningNodeData(locNode, discoDataBag.joiningNodeData()); + ZkJoiningNodeData joinData = new ZkJoiningNodeData(locNode, + new HashMap<>(F.viewReadOnly(discoDataBag.joiningNodeData(), msgParser::marshalZip))); byte[] joinDataBytes; @@ -2070,7 +2072,7 @@ private ZkNodeValidateResult validateJoiningNode(ZkJoiningNodeData joiningNodeDa if (err == null) { DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(node.id(), joiningNodeData.node().isClient()); - joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData()); + joiningNodeBag.joiningNodeData(F.viewReadOnly(joiningNodeData.discoveryData(), msgParser::unmarshalZip)); err = spi.getSpiContext().validateNode(node, joiningNodeBag); } @@ -2237,7 +2239,7 @@ private void addJoinedNode( DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId, joiningNodeData.node().isClient()); - joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData()); + joiningNodeBag.joiningNodeData(F.viewReadOnly(joiningNodeData.discoveryData(), msgParser::unmarshalZip)); exchange.onExchange(joiningNodeBag); @@ -2873,7 +2875,7 @@ private boolean processBulkJoin(ZkDiscoveryEventsData evtsData, ZkDiscoveryNodeJ DiscoveryDataBag dataBag = new DiscoveryDataBag(joinedEvtData.nodeId, joiningData.node().isClient()); - dataBag.joiningNodeData(joiningData.discoveryData()); + dataBag.joiningNodeData(F.viewReadOnly(joiningData.discoveryData(), msgParser::unmarshalZip)); exchange.onExchange(dataBag); }