Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@
import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse;
import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponseSerializer;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntrySerializer;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionExSerializer;
Expand All @@ -279,6 +280,7 @@
import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import org.apache.ignite.internal.processors.continuous.GridContinuousMessageSerializer;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntrySerializer;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequestSerializer;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
Expand Down Expand Up @@ -470,14 +472,14 @@ public GridIoMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarsh
factory.register((short)92, CacheEntryInfoCollection::new, new CacheEntryInfoCollectionSerializer());
factory.register((short)93, CacheInvokeDirectResult::new, new CacheInvokeDirectResultSerializer());
factory.register((short)94, IgniteTxKey::new, new IgniteTxKeySerializer());
factory.register((short)95, DataStreamerEntry::new);
factory.register((short)95, DataStreamerEntry::new, new DataStreamerEntrySerializer());
factory.register((short)96, CacheContinuousQueryEntry::new, new CacheContinuousQueryEntrySerializer());
factory.register((short)97, CacheEvictionEntry::new, new CacheEvictionEntrySerializer());
factory.register((short)98, CacheEntryPredicateAdapter::new, new CacheEntryPredicateAdapterSerializer());
factory.register((short)100, IgniteTxEntry::new, new IgniteTxEntrySerializer());
factory.register((short)101, TxEntryValueHolder::new, new TxEntryValueHolderSerializer());
factory.register((short)102, CacheVersionedValue::new, new CacheVersionedValueSerializer());
factory.register((short)103, GridCacheRawVersionedEntry::new);
factory.register((short)103, GridCacheRawVersionedEntry::new, new GridCacheRawVersionedEntrySerializer());
factory.register((short)104, GridCacheVersionEx::new, new GridCacheVersionExSerializer());
factory.register((short)106, GridQueryCancelRequest::new, new GridQueryCancelRequestSerializer());
factory.register((short)107, GridQueryFailResponse::new, new GridQueryFailResponseSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5900,8 +5900,6 @@ else if (ttl == CU.TTL_NOT_CHANGED)
0,
ver.conflictVersion());

e.prepareDirectMarshal(ctx.cacheObjectContext());

col.add(e);

if (col.size() == ldr.perNodeBufferSize()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,29 @@

package org.apache.ignite.internal.processors.cache.version;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;

/**
* Raw versioned entry.
*/
@IgniteCodeGeneratingFail
public class GridCacheRawVersionedEntry<K, V> extends DataStreamerEntry implements
GridCacheVersionedEntry<K, V>, GridCacheVersionable, Externalizable {
/** */
private static final long serialVersionUID = 0L;

/** Key bytes. */
@GridDirectTransient
private byte[] keyBytes;

/** Value bytes. */
private byte[] valBytes;

public class GridCacheRawVersionedEntry<K, V> extends DataStreamerEntry implements GridCacheVersionedEntry<K, V>, GridCacheVersionable {
/** TTL. */
private long ttl;
@Order(0)
long ttl;

/** Expire time. */
private long expireTime;
@Order(1)
long expireTime;

/** Version. */
private GridCacheVersion ver;
@Order(2)
GridCacheVersion ver;

/**
* {@code Externalizable} support.
Expand Down Expand Up @@ -92,28 +71,6 @@ public GridCacheRawVersionedEntry(KeyCacheObject key,
this.ver = ver;
}

/**
* Constructor used in receiver hub where marshalled key and value are available and we do not want to
* unmarshal value.
*
* @param keyBytes Key.
* @param valBytes Value bytes.
* @param expireTime Expire time.
* @param ttl TTL.
* @param ver Version.
*/
public GridCacheRawVersionedEntry(byte[] keyBytes,
byte[] valBytes,
long ttl,
long expireTime,
GridCacheVersion ver) {
this.keyBytes = keyBytes;
this.valBytes = valBytes;
this.ttl = ttl;
this.expireTime = expireTime;
this.ver = ver;
}

/** {@inheritDoc} */
@Override public K key() {
assert key != null : "Entry is being improperly processed.";
Expand Down Expand Up @@ -156,162 +113,13 @@ public GridCacheRawVersionedEntry(byte[] keyBytes,
return ver;
}

/**
* Perform internal unmarshal of this entry. It must be performed after entry is deserialized and before
* its restored key/value are needed.
*
* @param ctx Context.
* @param marsh Marshaller.
* @throws IgniteCheckedException If failed.
*/
public void unmarshal(CacheObjectContext ctx, Marshaller marsh) throws IgniteCheckedException {
unmarshalKey(ctx, marsh);

if (val == null && valBytes != null) {
val = U.unmarshal(marsh, valBytes, U.resolveClassLoader(null, ctx.classLoader()));

val.finishUnmarshal(ctx, null);
}
}

/**
* Perform internal key unmarshal of this entry. It must be performed after entry is deserialized and before
* its restored key/value are needed.
*
* @param ctx Context.
* @param marsh Marshaller.
* @throws IgniteCheckedException If failed.
*/
public void unmarshalKey(CacheObjectContext ctx, Marshaller marsh) throws IgniteCheckedException {
if (key == null) {
assert keyBytes != null;

key = U.unmarshal(marsh, keyBytes, U.resolveClassLoader(null, ctx.classLoader()));

key.finishUnmarshal(ctx, null);
}
}

/**
* @param ctx Context.
* @throws IgniteCheckedException If failed.
*/
public void prepareDirectMarshal(CacheObjectContext ctx) throws IgniteCheckedException {
key.prepareMarshal(ctx);

if (val != null)
val.prepareMarshal(ctx);
}

/** {@inheritDoc} */
@Override public short directType() {
return 103;
}

/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf);

if (!super.readFrom(buf, reader))
return false;

switch (reader.state()) {
case 2:
expireTime = reader.readLong();

if (!reader.isLastRead())
return false;

reader.incrementState();

case 3:
ttl = reader.readLong();

if (!reader.isLastRead())
return false;

reader.incrementState();

case 4:
valBytes = reader.readByteArray();

if (!reader.isLastRead())
return false;

reader.incrementState();

case 5:
ver = reader.readMessage();

if (!reader.isLastRead())
return false;

reader.incrementState();

}

return true;
}

/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);

if (!super.writeTo(buf, writer))
return false;

if (!writer.isHeaderWritten()) {
if (!writer.writeHeader(directType()))
return false;

writer.onHeaderWritten();
}

switch (writer.state()) {
case 2:
if (!writer.writeLong(expireTime))
return false;

writer.incrementState();

case 3:
if (!writer.writeLong(ttl))
return false;

writer.incrementState();

case 4:
if (!writer.writeByteArray(valBytes))
return false;

writer.incrementState();

case 5:
if (!writer.writeMessage(ver))
return false;

writer.incrementState();

}

return true;
}

/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
assert false;
}

/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
assert false;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheRawVersionedEntry.class, this,
"keyBytesLen", keyBytes != null ? keyBytes.length : "n/a",
"valBytesLen", valBytes != null ? valBytes.length : "n/a",
"super", super.toString());
return S.toString(GridCacheRawVersionedEntry.class, this, "super", super.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,28 @@

package org.apache.ignite.internal.processors.datastreamer;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;

/**
*
*/
public class DataStreamerEntry implements Map.Entry<KeyCacheObject, CacheObject>, Message {
/** */
@Order(0)
@GridToStringInclude
protected KeyCacheObject key;
public KeyCacheObject key;

/** */
@Order(1)
@GridToStringInclude
protected CacheObject val;
public CacheObject val;

/**
*
Expand Down Expand Up @@ -95,61 +95,6 @@ public <K, V> Map.Entry<K, V> toEntry(final GridCacheContext ctx, final boolean
};
}

/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);

if (!writer.isHeaderWritten()) {
if (!writer.writeHeader(directType()))
return false;

writer.onHeaderWritten();
}

switch (writer.state()) {
case 0:
if (!writer.writeKeyCacheObject(key))
return false;

writer.incrementState();

case 1:
if (!writer.writeCacheObject(val))
return false;

writer.incrementState();

}

return true;
}

/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf);

switch (reader.state()) {
case 0:
key = reader.readKeyCacheObject();

if (!reader.isLastRead())
return false;

reader.incrementState();

case 1:
val = reader.readCacheObject();

if (!reader.isLastRead())
return false;

reader.incrementState();

}

return true;
}

/** {@inheritDoc} */
@Override public short directType() {
return 95;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,11 @@
if (log.isDebugEnabled())
log.debug("Running DR put job [nodeId=" + ctx.localNodeId() + ", cacheName=" + cacheName + ']');

CacheObjectContext cacheObjCtx = cache.context().cacheObjectContext();

Check warning on line 75 in modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this useless assignment to local variable "cacheObjCtx".

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZz1wjYuuzevaEjDf89X&open=AZz1wjYuuzevaEjDf89X&pullRequest=12746

Check warning on line 75 in modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused "cacheObjCtx" local variable.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZz1wjYuuzevaEjDf89Y&open=AZz1wjYuuzevaEjDf89Y&pullRequest=12746

for (Map.Entry<KeyCacheObject, CacheObject> entry0 : col) {
GridCacheRawVersionedEntry entry = (GridCacheRawVersionedEntry)entry0;

entry.unmarshal(cacheObjCtx, ctx.marshaller());

KeyCacheObject key = entry.getKey();

// Ensure that receiver to not receive special-purpose values for TTL and expire time.
Expand Down
Loading