Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,20 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
import org.apache.ignite.internal.processors.query.calcite.message.CalciteMarshalableMessage;
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.GridIntIterator;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

/** */
public class ColocationGroup implements CalciteMessage {
public class ColocationGroup implements CalciteMarshalableMessage {
/** */
@Order(0)
long[] srcIds;
Expand All @@ -60,7 +61,7 @@ public class ColocationGroup implements CalciteMessage {
private boolean primaryAssignment;

/** Marshalled assignments serialization call holder. */
@Order(value = 2, method = "marshalledAssignments")
@Order(2)
int[] marshalledAssignments;

/** */
Expand Down Expand Up @@ -317,54 +318,49 @@ public int[] partitions(UUID nodeId) {
return MessageType.COLOCATION_GROUP;
}

/** Significantly compacts and fastens UUIDs marshalling. */
public @Nullable int[] marshalledAssignments() {
if (assignments == null || primaryAssignment)
return null;

Map<UUID, Integer> nodeIdxs = new HashMap<>();
/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (assignments == null && !primaryAssignment) {
Map<UUID, Integer> nodeIdxs = new HashMap<>();

for (int i = 0; i < nodeIds.size(); i++)
nodeIdxs.put(nodeIds.get(i), i);
for (int i = 0; i < nodeIds.size(); i++)
nodeIdxs.put(nodeIds.get(i), i);

int bitsPerPart = Integer.SIZE - Integer.numberOfLeadingZeros(nodeIds.size());
int bitsPerPart = Integer.SIZE - Integer.numberOfLeadingZeros(nodeIds.size());

CompactedIntArray.Builder builder = CompactedIntArray.builder(bitsPerPart, assignments.size());
CompactedIntArray.Builder builder = CompactedIntArray.builder(bitsPerPart, assignments.size());

for (List<UUID> assignment : assignments) {
assert F.isEmpty(assignment) || assignment.size() == 1;
for (List<UUID> assignment : assignments) {
assert F.isEmpty(assignment) || assignment.size() == 1;

if (F.isEmpty(assignment))
builder.add(nodeIds.size());
else {
Integer nodeIdx = nodeIdxs.get(assignment.get(0));
if (F.isEmpty(assignment))
builder.add(nodeIds.size());
else {
Integer nodeIdx = nodeIdxs.get(assignment.get(0));

builder.add(nodeIdx);
builder.add(nodeIdx);
}
}
}

return builder.build().buffer();
}

/** Significantly compacts and fastens UUIDs unmarshalling. */
public void marshalledAssignments(@Nullable int[] marshalledAssignments) {
if (F.isEmpty(marshalledAssignments)) {
assignments = null;

return;
marshalledAssignments = builder.build().buffer();
}
}

int bitsPerPart = Integer.SIZE - Integer.numberOfLeadingZeros(nodeIds.size());
/** {@inheritDoc} */
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (!F.isEmpty(marshalledAssignments)) {
int bitsPerPart = Integer.SIZE - Integer.numberOfLeadingZeros(nodeIds.size());

CompactedIntArray compactedArr = CompactedIntArray.of(bitsPerPart, marshalledAssignments);
CompactedIntArray compactedArr = CompactedIntArray.of(bitsPerPart, marshalledAssignments);

assignments = new ArrayList<>(compactedArr.size());
assignments = new ArrayList<>(compactedArr.size());

for (GridIntIterator iter = compactedArr.iterator(); iter.hasNext(); ) {
int nodeIdx = iter.next();
for (GridIntIterator iter = compactedArr.iterator(); iter.hasNext(); ) {
int nodeIdx = iter.next();

assignments.add(nodeIdx >= nodeIds.size() ? Collections.emptyList() :
Collections.singletonList(nodeIds.get(nodeIdx)));
assignments.add(nodeIdx >= nodeIds.size() ? Collections.emptyList() :
Collections.singletonList(nodeIds.get(nodeIdx)));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.ignite.internal.cache.query.index;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyType;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.jetbrains.annotations.Nullable;

/** Message wrapper for {@link IndexKeyType}. */
public class IndexKeyTypeMessage implements Message {
public class IndexKeyTypeMessage implements MarshallableMessage {
/** Type code. */
public static final short TYPE_CODE = 516;

Expand All @@ -35,8 +37,8 @@ public class IndexKeyTypeMessage implements Message {
private @Nullable IndexKeyType val;

/** Code. */
@Order(value = 0, method = "code")
byte code = NULL_VALUE_CODE;
@Order(0)
byte code;

/** Empty constructor for {@link GridIoMessageFactory}. */
public IndexKeyTypeMessage() {
Expand All @@ -46,12 +48,11 @@ public IndexKeyTypeMessage() {
/** Constructor. */
public IndexKeyTypeMessage(@Nullable IndexKeyType keyType) {
val = keyType;
code = encode(keyType);
}

/** Constructor. */
public IndexKeyTypeMessage(int keyTypeCode) {
code((byte)keyTypeCode);
code = (byte)keyTypeCode;
}

/** @return Code. */
Expand All @@ -64,7 +65,6 @@ public byte code() {
*/
public void code(byte code) {
this.code = code;
val = decode(code);
}

/** @return Index key type. */
Expand Down Expand Up @@ -94,4 +94,14 @@ private static byte encode(@Nullable IndexKeyType keyType) {
@Override public short directType() {
return TYPE_CODE;
}

/** {@inheritDoc} */
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
code = encode(val);
}

/** {@inheritDoc} */
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
val = decode(code);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.ignite.internal.TxInfo;
import org.apache.ignite.internal.TxInfoSerializer;
import org.apache.ignite.internal.cache.query.index.IndexKeyTypeMessage;
import org.apache.ignite.internal.cache.query.index.IndexKeyTypeMessageSerializer;
import org.apache.ignite.internal.cache.query.index.IndexKeyTypeMessageMarshallableSerializer;
import org.apache.ignite.internal.cache.query.index.IndexQueryResultMeta;
import org.apache.ignite.internal.cache.query.index.IndexQueryResultMetaSerializer;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
Expand Down Expand Up @@ -72,7 +72,7 @@
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollectionSerializer;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicateAdapter;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicateAdapterSerializer;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicateAdapterMarshallableSerializer;
import org.apache.ignite.internal.processors.cache.CacheEvictionEntry;
import org.apache.ignite.internal.processors.cache.CacheEvictionEntrySerializer;
import org.apache.ignite.internal.processors.cache.CacheInvokeDirectResult;
Expand Down Expand Up @@ -132,7 +132,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequestSerializer;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessageSerializer;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessageMarshallableSerializer;
import org.apache.ignite.internal.processors.cache.distributed.dht.TransactionAttributesAwareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.TransactionAttributesAwareRequestSerializer;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.AtomicApplicationAttributesAwareRequest;
Expand Down Expand Up @@ -352,26 +352,26 @@
*/
public class GridIoMessageFactory implements MessageFactoryProvider {
/** Custom data marshaller. */
private final Marshaller cstDataMarshall;
private final Marshaller marsh;

/** Class loader for the custom data marshalling. */
private final ClassLoader cstDataMarshallClsLdr;
private final ClassLoader clsLdr;

/**
* @param cstDataMarshall Custom data marshaller.
* @param cstDataMarshallClsLdr Class loader for the custom data marshalling.
* @param marsh Custom data marshaller.
* @param clsLdr Class loader for the custom data marshalling.
*/
public GridIoMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarshallClsLdr) {
this.cstDataMarshall = cstDataMarshall;
this.cstDataMarshallClsLdr = cstDataMarshallClsLdr;
public GridIoMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
this.marsh = marsh;
this.clsLdr = clsLdr;
}

/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
// -54 is reserved for SQL.
// We don't use the code‑generated serializer for CompressedMessage - serialization is highly customized.
factory.register(CompressedMessage.TYPE_CODE, CompressedMessage::new);
factory.register((short)-66, ErrorMessage::new, new ErrorMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
factory.register((short)-66, ErrorMessage::new, new ErrorMessageMarshallableSerializer(marsh, clsLdr));
factory.register((short)-65, TxInfo::new, new TxInfoSerializer());
factory.register((short)-64, TxEntriesInfo::new, new TxEntriesInfoSerializer());
factory.register((short)-63, ExchangeInfo::new, new ExchangeInfoSerializer());
Expand Down Expand Up @@ -473,7 +473,7 @@ public GridIoMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarsh
factory.register((short)95, DataStreamerEntry::new);
factory.register((short)96, CacheContinuousQueryEntry::new, new CacheContinuousQueryEntrySerializer());
factory.register((short)97, CacheEvictionEntry::new, new CacheEvictionEntrySerializer());
factory.register((short)98, CacheEntryPredicateAdapter::new, new CacheEntryPredicateAdapterSerializer());
factory.register((short)98, CacheEntryPredicateAdapter::new, new CacheEntryPredicateAdapterMarshallableSerializer(marsh, clsLdr));
factory.register((short)100, IgniteTxEntry::new, new IgniteTxEntrySerializer());
factory.register((short)101, TxEntryValueHolder::new, new TxEntryValueHolderSerializer());
factory.register((short)102, CacheVersionedValue::new, new CacheVersionedValueSerializer());
Expand Down Expand Up @@ -507,7 +507,8 @@ public GridIoMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarsh
factory.register(CacheMetricsMessage.TYPE_CODE, CacheMetricsMessage::new, new CacheMetricsMessageSerializer());
factory.register(NodeMetricsMessage.TYPE_CODE, NodeMetricsMessage::new, new NodeMetricsMessageSerializer());
factory.register(NodeFullMetricsMessage.TYPE_CODE, NodeFullMetricsMessage::new, new NodeFullMetricsMessageSerializer());
factory.register((short)157, PartitionUpdateCountersMessage::new, new PartitionUpdateCountersMessageSerializer());
factory.register((short)157, PartitionUpdateCountersMessage::new,
new PartitionUpdateCountersMessageMarshallableSerializer(marsh, clsLdr));
factory.register((short)162, GenerateEncryptionKeyRequest::new, new GenerateEncryptionKeyRequestSerializer());
factory.register((short)163, GenerateEncryptionKeyResponse::new, new GenerateEncryptionKeyResponseSerializer());
factory.register((short)167, ServiceDeploymentProcessId::new, new ServiceDeploymentProcessIdSerializer());
Expand Down Expand Up @@ -557,7 +558,8 @@ public GridIoMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarsh
factory.register(IgniteDhtPartitionsToReloadMap.TYPE_CODE, IgniteDhtPartitionsToReloadMap::new,
new IgniteDhtPartitionsToReloadMapSerializer());
factory.register(IntLongMap.TYPE_CODE, IntLongMap::new, new IntLongMapSerializer());
factory.register(IndexKeyTypeMessage.TYPE_CODE, IndexKeyTypeMessage::new, new IndexKeyTypeMessageSerializer());
factory.register(IndexKeyTypeMessage.TYPE_CODE, IndexKeyTypeMessage::new,
new IndexKeyTypeMessageMarshallableSerializer(marsh, clsLdr));
factory.register(GridPartitionStateMap.TYPE_CODE, GridPartitionStateMap::new, new GridPartitionStateMapSerializer());
factory.register(GridDhtPartitionMap.TYPE_CODE, GridDhtPartitionMap::new, new GridDhtPartitionMapSerializer());
factory.register(GridDhtPartitionFullMap.TYPE_CODE, GridDhtPartitionFullMap::new, new GridDhtPartitionFullMapSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,18 @@
/** Message factory for discovery messages. */
public class DiscoveryMessageFactory implements MessageFactoryProvider {
/** Custom data marshaller. */
private final Marshaller cstDataMarshall;
private final Marshaller marsh;

/** Class loader for the custom data marshalling. */
private final ClassLoader cstDataMarshallClsLdr;
private final ClassLoader clsLdr;

/**
* @param cstDataMarshall Custom data marshaller.
* @param cstDataMarshallClsLdr Class loader for the custom data marshalling.
* @param marsh Custom data marshaller.
* @param clsLdr Class loader for the custom data marshalling.
*/
public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarshallClsLdr) {
this.cstDataMarshall = cstDataMarshall;
this.cstDataMarshallClsLdr = cstDataMarshallClsLdr;
public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
this.marsh = marsh;
this.clsLdr = clsLdr;
}

/** {@inheritDoc} */
Expand All @@ -163,7 +163,7 @@ public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMa
factory.register((short)-102, TcpDiscoveryNodeMetricsMessage::new, new TcpDiscoveryNodeMetricsMessageSerializer());
factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer());
factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer());
factory.register((short)-66, ErrorMessage::new, new ErrorMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
factory.register((short)-66, ErrorMessage::new, new ErrorMessageMarshallableSerializer(marsh, clsLdr));

// TcpDiscoveryAbstractMessage
factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer());
Expand All @@ -186,9 +186,9 @@ public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMa
factory.register((short)17, TcpDiscoveryNodeFailedMessage::new, new TcpDiscoveryNodeFailedMessageSerializer());
factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new TcpDiscoveryStatusCheckMessageSerializer());
factory.register((short)19, TcpDiscoveryNodeAddFinishedMessage::new,
new TcpDiscoveryNodeAddFinishedMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
new TcpDiscoveryNodeAddFinishedMessageMarshallableSerializer(marsh, clsLdr));
factory.register((short)20, TcpDiscoveryJoinRequestMessage::new,
new TcpDiscoveryJoinRequestMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
new TcpDiscoveryJoinRequestMessageMarshallableSerializer(marsh, clsLdr));
factory.register((short)21, TcpDiscoveryCustomEventMessage::new, new TcpDiscoveryCustomEventMessageSerializer());
factory.register((short)22, TcpDiscoveryServerOnlyCustomEventMessage::new,
new TcpDiscoveryServerOnlyCustomEventMessageSerializer());
Expand All @@ -198,11 +198,12 @@ public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMa
factory.register((short)26, DistributedMetaStorageCasMessage::new, new DistributedMetaStorageCasMessageSerializer());
factory.register((short)27, DistributedMetaStorageCasAckMessage::new, new DistributedMetaStorageCasAckMessageSerializer());
factory.register((short)28, TcpDiscoveryClientReconnectMessage::new,
new TcpDiscoveryClientReconnectMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
new TcpDiscoveryClientReconnectMessageMarshallableSerializer(marsh, clsLdr));

// DiscoveryCustomMessage
factory.register((short)500, CacheStatisticsModeChangeMessage::new, new CacheStatisticsModeChangeMessageSerializer());
factory.register((short)501, SecurityAwareCustomMessageWrapper::new, new SecurityAwareCustomMessageWrapperSerializer());
factory.register((short)501, SecurityAwareCustomMessageWrapper::new,
new SecurityAwareCustomMessageWrapperMarshallableSerializer(marsh, clsLdr));
factory.register((short)502, MetadataRemoveAcceptedMessage::new, new MetadataRemoveAcceptedMessageSerializer());
factory.register((short)503, MetadataRemoveProposedMessage::new, new MetadataRemoveProposedMessageSerializer());
factory.register((short)504, SchemaProposeDiscoveryMessage::new, new SchemaProposeDiscoveryMessageSerializer());
Expand Down
Loading
Loading