diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index eb022f0edb1bd..a3379034634d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -262,6 +262,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse; import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponseSerializer; import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry; +import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntrySerializer; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionExSerializer; @@ -279,6 +280,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousMessage; import org.apache.ignite.internal.processors.continuous.GridContinuousMessageSerializer; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; +import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntrySerializer; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequestSerializer; import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse; @@ -470,14 +472,14 @@ public GridIoMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarsh factory.register((short)92, CacheEntryInfoCollection::new, new CacheEntryInfoCollectionSerializer()); factory.register((short)93, CacheInvokeDirectResult::new, new CacheInvokeDirectResultSerializer()); factory.register((short)94, IgniteTxKey::new, new IgniteTxKeySerializer()); - factory.register((short)95, DataStreamerEntry::new); + factory.register((short)95, DataStreamerEntry::new, new DataStreamerEntrySerializer()); factory.register((short)96, CacheContinuousQueryEntry::new, new CacheContinuousQueryEntrySerializer()); factory.register((short)97, CacheEvictionEntry::new, new CacheEvictionEntrySerializer()); factory.register((short)98, CacheEntryPredicateAdapter::new, new CacheEntryPredicateAdapterSerializer()); factory.register((short)100, IgniteTxEntry::new, new IgniteTxEntrySerializer()); factory.register((short)101, TxEntryValueHolder::new, new TxEntryValueHolderSerializer()); factory.register((short)102, CacheVersionedValue::new, new CacheVersionedValueSerializer()); - factory.register((short)103, GridCacheRawVersionedEntry::new); + factory.register((short)103, GridCacheRawVersionedEntry::new, new GridCacheRawVersionedEntrySerializer()); factory.register((short)104, GridCacheVersionEx::new, new GridCacheVersionExSerializer()); factory.register((short)106, GridQueryCancelRequest::new, new GridQueryCancelRequestSerializer()); factory.register((short)107, GridQueryFailResponse::new, new GridQueryFailResponseSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index bb3ed8c4e82a6..b13161079eb1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -5900,8 +5900,6 @@ else if (ttl == CU.TTL_NOT_CHANGED) 0, ver.conflictVersion()); - e.prepareDirectMarshal(ctx.cacheObjectContext()); - col.add(e); if (col.size() == ldr.perNodeBufferSize()) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java index ccb93f176dc20..88141816abfe9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java @@ -17,50 +17,29 @@ package org.apache.ignite.internal.processors.cache.version; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.nio.ByteBuffer; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.internal.IgniteCodeGeneratingFail; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; /** * Raw versioned entry. */ -@IgniteCodeGeneratingFail -public class GridCacheRawVersionedEntry extends DataStreamerEntry implements - GridCacheVersionedEntry, GridCacheVersionable, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Key bytes. */ - @GridDirectTransient - private byte[] keyBytes; - - /** Value bytes. */ - private byte[] valBytes; - +public class GridCacheRawVersionedEntry extends DataStreamerEntry implements GridCacheVersionedEntry, GridCacheVersionable { /** TTL. */ - private long ttl; + @Order(0) + long ttl; /** Expire time. */ - private long expireTime; + @Order(1) + long expireTime; /** Version. */ - private GridCacheVersion ver; + @Order(2) + GridCacheVersion ver; /** * {@code Externalizable} support. @@ -92,28 +71,6 @@ public GridCacheRawVersionedEntry(KeyCacheObject key, this.ver = ver; } - /** - * Constructor used in receiver hub where marshalled key and value are available and we do not want to - * unmarshal value. - * - * @param keyBytes Key. - * @param valBytes Value bytes. - * @param expireTime Expire time. - * @param ttl TTL. - * @param ver Version. - */ - public GridCacheRawVersionedEntry(byte[] keyBytes, - byte[] valBytes, - long ttl, - long expireTime, - GridCacheVersion ver) { - this.keyBytes = keyBytes; - this.valBytes = valBytes; - this.ttl = ttl; - this.expireTime = expireTime; - this.ver = ver; - } - /** {@inheritDoc} */ @Override public K key() { assert key != null : "Entry is being improperly processed."; @@ -156,162 +113,13 @@ public GridCacheRawVersionedEntry(byte[] keyBytes, return ver; } - /** - * Perform internal unmarshal of this entry. It must be performed after entry is deserialized and before - * its restored key/value are needed. - * - * @param ctx Context. - * @param marsh Marshaller. - * @throws IgniteCheckedException If failed. - */ - public void unmarshal(CacheObjectContext ctx, Marshaller marsh) throws IgniteCheckedException { - unmarshalKey(ctx, marsh); - - if (val == null && valBytes != null) { - val = U.unmarshal(marsh, valBytes, U.resolveClassLoader(null, ctx.classLoader())); - - val.finishUnmarshal(ctx, null); - } - } - - /** - * Perform internal key unmarshal of this entry. It must be performed after entry is deserialized and before - * its restored key/value are needed. - * - * @param ctx Context. - * @param marsh Marshaller. - * @throws IgniteCheckedException If failed. - */ - public void unmarshalKey(CacheObjectContext ctx, Marshaller marsh) throws IgniteCheckedException { - if (key == null) { - assert keyBytes != null; - - key = U.unmarshal(marsh, keyBytes, U.resolveClassLoader(null, ctx.classLoader())); - - key.finishUnmarshal(ctx, null); - } - } - - /** - * @param ctx Context. - * @throws IgniteCheckedException If failed. - */ - public void prepareDirectMarshal(CacheObjectContext ctx) throws IgniteCheckedException { - key.prepareMarshal(ctx); - - if (val != null) - val.prepareMarshal(ctx); - } - /** {@inheritDoc} */ @Override public short directType() { return 103; } - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 2: - expireTime = reader.readLong(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 3: - ttl = reader.readLong(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - valBytes = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - ver = reader.readMessage(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 2: - if (!writer.writeLong(expireTime)) - return false; - - writer.incrementState(); - - case 3: - if (!writer.writeLong(ttl)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeByteArray(valBytes)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeMessage(ver)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - assert false; - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - assert false; - } - /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridCacheRawVersionedEntry.class, this, - "keyBytesLen", keyBytes != null ? keyBytes.length : "n/a", - "valBytesLen", valBytes != null ? valBytes.length : "n/a", - "super", super.toString()); + return S.toString(GridCacheRawVersionedEntry.class, this, "super", super.toString()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java index 8a8c54603dc92..91f6998b446aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java @@ -17,28 +17,28 @@ package org.apache.ignite.internal.processors.datastreamer; -import java.nio.ByteBuffer; import java.util.Map; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * */ public class DataStreamerEntry implements Map.Entry, Message { /** */ + @Order(0) @GridToStringInclude - protected KeyCacheObject key; + public KeyCacheObject key; /** */ + @Order(1) @GridToStringInclude - protected CacheObject val; + public CacheObject val; /** * @@ -95,61 +95,6 @@ public Map.Entry toEntry(final GridCacheContext ctx, final boolean }; } - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeKeyCacheObject(key)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeCacheObject(val)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - switch (reader.state()) { - case 0: - key = reader.readKeyCacheObject(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - val = reader.readCacheObject(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - /** {@inheritDoc} */ @Override public short directType() { return 95; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java index 08c4380fcd1bb..8f5dc19e20fcd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java @@ -77,8 +77,6 @@ public class IgniteDrDataStreamerCacheUpdater implements StreamReceiver entry0 : col) { GridCacheRawVersionedEntry entry = (GridCacheRawVersionedEntry)entry0; - entry.unmarshal(cacheObjCtx, ctx.marshaller()); - KeyCacheObject key = entry.getKey(); // Ensure that receiver to not receive special-purpose values for TTL and expire time.