Skip to content

Commit d6ce1a7

Browse files
committed
IGNITE-27853 Refactor GridCacheRawVersionedEntry and DataStreamerEntry
1 parent a31202e commit d6ce1a7

5 files changed

Lines changed: 18 additions & 266 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@
262262
import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse;
263263
import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponseSerializer;
264264
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
265+
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntrySerializer;
265266
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
266267
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
267268
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionExSerializer;
@@ -279,6 +280,7 @@
279280
import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
280281
import org.apache.ignite.internal.processors.continuous.GridContinuousMessageSerializer;
281282
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
283+
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntrySerializer;
282284
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
283285
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequestSerializer;
284286
import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
@@ -454,14 +456,14 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
454456
factory.register((short)92, CacheEntryInfoCollection::new, new CacheEntryInfoCollectionSerializer());
455457
factory.register((short)93, CacheInvokeDirectResult::new, new CacheInvokeDirectResultSerializer());
456458
factory.register((short)94, IgniteTxKey::new, new IgniteTxKeySerializer());
457-
factory.register((short)95, DataStreamerEntry::new);
459+
factory.register((short)95, DataStreamerEntry::new, new DataStreamerEntrySerializer());
458460
factory.register((short)96, CacheContinuousQueryEntry::new, new CacheContinuousQueryEntrySerializer());
459461
factory.register((short)97, CacheEvictionEntry::new, new CacheEvictionEntrySerializer());
460462
factory.register((short)98, CacheEntryPredicateAdapter::new, new CacheEntryPredicateAdapterSerializer());
461463
factory.register((short)100, IgniteTxEntry::new, new IgniteTxEntrySerializer());
462464
factory.register((short)101, TxEntryValueHolder::new, new TxEntryValueHolderSerializer());
463465
factory.register((short)102, CacheVersionedValue::new, new CacheVersionedValueSerializer());
464-
factory.register((short)103, GridCacheRawVersionedEntry::new);
466+
factory.register((short)103, GridCacheRawVersionedEntry::new, new GridCacheRawVersionedEntrySerializer());
465467
factory.register((short)104, GridCacheVersionEx::new, new GridCacheVersionExSerializer());
466468
factory.register((short)106, GridQueryCancelRequest::new, new GridQueryCancelRequestSerializer());
467469
factory.register((short)107, GridQueryFailResponse::new, new GridQueryFailResponseSerializer());

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5900,8 +5900,6 @@ else if (ttl == CU.TTL_NOT_CHANGED)
59005900
0,
59015901
ver.conflictVersion());
59025902

5903-
e.prepareDirectMarshal(ctx.cacheObjectContext());
5904-
59055903
col.add(e);
59065904

59075905
if (col.size() == ldr.perNodeBufferSize()) {

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java

Lines changed: 9 additions & 200 deletions
Original file line numberDiff line numberDiff line change
@@ -17,50 +17,30 @@
1717

1818
package org.apache.ignite.internal.processors.cache.version;
1919

20-
import java.io.Externalizable;
21-
import java.io.IOException;
22-
import java.io.ObjectInput;
23-
import java.io.ObjectOutput;
24-
import java.nio.ByteBuffer;
25-
import org.apache.ignite.IgniteCheckedException;
26-
import org.apache.ignite.internal.GridDirectTransient;
27-
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
20+
import org.apache.ignite.internal.Order;
2821
import org.apache.ignite.internal.processors.cache.CacheObject;
29-
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
3022
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
3123
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
3224
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
3325
import org.apache.ignite.internal.util.typedef.internal.S;
34-
import org.apache.ignite.internal.util.typedef.internal.U;
35-
import org.apache.ignite.marshaller.Marshaller;
36-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
37-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
3826
import org.jetbrains.annotations.Nullable;
3927

4028
/**
4129
* Raw versioned entry.
4230
*/
43-
@IgniteCodeGeneratingFail
4431
public class GridCacheRawVersionedEntry<K, V> extends DataStreamerEntry implements
45-
GridCacheVersionedEntry<K, V>, GridCacheVersionable, Externalizable {
46-
/** */
47-
private static final long serialVersionUID = 0L;
48-
49-
/** Key bytes. */
50-
@GridDirectTransient
51-
private byte[] keyBytes;
52-
53-
/** Value bytes. */
54-
private byte[] valBytes;
55-
32+
GridCacheVersionedEntry<K, V>, GridCacheVersionable {
5633
/** TTL. */
57-
private long ttl;
34+
@Order(0)
35+
long ttl;
5836

5937
/** Expire time. */
60-
private long expireTime;
38+
@Order(1)
39+
long expireTime;
6140

6241
/** Version. */
63-
private GridCacheVersion ver;
42+
@Order(2)
43+
GridCacheVersion ver;
6444

6545
/**
6646
* {@code Externalizable} support.
@@ -92,28 +72,6 @@ public GridCacheRawVersionedEntry(KeyCacheObject key,
9272
this.ver = ver;
9373
}
9474

95-
/**
96-
* Constructor used in receiver hub where marshalled key and value are available and we do not want to
97-
* unmarshal value.
98-
*
99-
* @param keyBytes Key.
100-
* @param valBytes Value bytes.
101-
* @param expireTime Expire time.
102-
* @param ttl TTL.
103-
* @param ver Version.
104-
*/
105-
public GridCacheRawVersionedEntry(byte[] keyBytes,
106-
byte[] valBytes,
107-
long ttl,
108-
long expireTime,
109-
GridCacheVersion ver) {
110-
this.keyBytes = keyBytes;
111-
this.valBytes = valBytes;
112-
this.ttl = ttl;
113-
this.expireTime = expireTime;
114-
this.ver = ver;
115-
}
116-
11775
/** {@inheritDoc} */
11876
@Override public K key() {
11977
assert key != null : "Entry is being improperly processed.";
@@ -156,162 +114,13 @@ public GridCacheRawVersionedEntry(byte[] keyBytes,
156114
return ver;
157115
}
158116

159-
/**
160-
* Perform internal unmarshal of this entry. It must be performed after entry is deserialized and before
161-
* its restored key/value are needed.
162-
*
163-
* @param ctx Context.
164-
* @param marsh Marshaller.
165-
* @throws IgniteCheckedException If failed.
166-
*/
167-
public void unmarshal(CacheObjectContext ctx, Marshaller marsh) throws IgniteCheckedException {
168-
unmarshalKey(ctx, marsh);
169-
170-
if (val == null && valBytes != null) {
171-
val = U.unmarshal(marsh, valBytes, U.resolveClassLoader(null, ctx.classLoader()));
172-
173-
val.finishUnmarshal(ctx, null);
174-
}
175-
}
176-
177-
/**
178-
* Perform internal key unmarshal of this entry. It must be performed after entry is deserialized and before
179-
* its restored key/value are needed.
180-
*
181-
* @param ctx Context.
182-
* @param marsh Marshaller.
183-
* @throws IgniteCheckedException If failed.
184-
*/
185-
public void unmarshalKey(CacheObjectContext ctx, Marshaller marsh) throws IgniteCheckedException {
186-
if (key == null) {
187-
assert keyBytes != null;
188-
189-
key = U.unmarshal(marsh, keyBytes, U.resolveClassLoader(null, ctx.classLoader()));
190-
191-
key.finishUnmarshal(ctx, null);
192-
}
193-
}
194-
195-
/**
196-
* @param ctx Context.
197-
* @throws IgniteCheckedException If failed.
198-
*/
199-
public void prepareDirectMarshal(CacheObjectContext ctx) throws IgniteCheckedException {
200-
key.prepareMarshal(ctx);
201-
202-
if (val != null)
203-
val.prepareMarshal(ctx);
204-
}
205-
206117
/** {@inheritDoc} */
207118
@Override public short directType() {
208119
return 103;
209120
}
210121

211-
/** {@inheritDoc} */
212-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
213-
reader.setBuffer(buf);
214-
215-
if (!super.readFrom(buf, reader))
216-
return false;
217-
218-
switch (reader.state()) {
219-
case 2:
220-
expireTime = reader.readLong();
221-
222-
if (!reader.isLastRead())
223-
return false;
224-
225-
reader.incrementState();
226-
227-
case 3:
228-
ttl = reader.readLong();
229-
230-
if (!reader.isLastRead())
231-
return false;
232-
233-
reader.incrementState();
234-
235-
case 4:
236-
valBytes = reader.readByteArray();
237-
238-
if (!reader.isLastRead())
239-
return false;
240-
241-
reader.incrementState();
242-
243-
case 5:
244-
ver = reader.readMessage();
245-
246-
if (!reader.isLastRead())
247-
return false;
248-
249-
reader.incrementState();
250-
251-
}
252-
253-
return true;
254-
}
255-
256-
/** {@inheritDoc} */
257-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
258-
writer.setBuffer(buf);
259-
260-
if (!super.writeTo(buf, writer))
261-
return false;
262-
263-
if (!writer.isHeaderWritten()) {
264-
if (!writer.writeHeader(directType()))
265-
return false;
266-
267-
writer.onHeaderWritten();
268-
}
269-
270-
switch (writer.state()) {
271-
case 2:
272-
if (!writer.writeLong(expireTime))
273-
return false;
274-
275-
writer.incrementState();
276-
277-
case 3:
278-
if (!writer.writeLong(ttl))
279-
return false;
280-
281-
writer.incrementState();
282-
283-
case 4:
284-
if (!writer.writeByteArray(valBytes))
285-
return false;
286-
287-
writer.incrementState();
288-
289-
case 5:
290-
if (!writer.writeMessage(ver))
291-
return false;
292-
293-
writer.incrementState();
294-
295-
}
296-
297-
return true;
298-
}
299-
300-
/** {@inheritDoc} */
301-
@Override public void writeExternal(ObjectOutput out) throws IOException {
302-
assert false;
303-
}
304-
305-
/** {@inheritDoc} */
306-
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
307-
assert false;
308-
}
309-
310122
/** {@inheritDoc} */
311123
@Override public String toString() {
312-
return S.toString(GridCacheRawVersionedEntry.class, this,
313-
"keyBytesLen", keyBytes != null ? keyBytes.length : "n/a",
314-
"valBytesLen", valBytes != null ? valBytes.length : "n/a",
315-
"super", super.toString());
124+
return S.toString(GridCacheRawVersionedEntry.class, this, "super", super.toString());
316125
}
317126
}

modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java

Lines changed: 5 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,28 @@
1717

1818
package org.apache.ignite.internal.processors.datastreamer;
1919

20-
import java.nio.ByteBuffer;
2120
import java.util.Map;
21+
import org.apache.ignite.internal.Order;
2222
import org.apache.ignite.internal.processors.cache.CacheObject;
2323
import org.apache.ignite.internal.processors.cache.GridCacheContext;
2424
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
2525
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
2626
import org.apache.ignite.internal.util.typedef.internal.S;
2727
import org.apache.ignite.plugin.extensions.communication.Message;
28-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
29-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
3028

3129
/**
3230
*
3331
*/
3432
public class DataStreamerEntry implements Map.Entry<KeyCacheObject, CacheObject>, Message {
3533
/** */
34+
@Order(0)
3635
@GridToStringInclude
37-
protected KeyCacheObject key;
36+
public KeyCacheObject key;
3837

3938
/** */
39+
@Order(1)
4040
@GridToStringInclude
41-
protected CacheObject val;
41+
public CacheObject val;
4242

4343
/**
4444
*
@@ -95,61 +95,6 @@ public <K, V> Map.Entry<K, V> toEntry(final GridCacheContext ctx, final boolean
9595
};
9696
}
9797

98-
/** {@inheritDoc} */
99-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
100-
writer.setBuffer(buf);
101-
102-
if (!writer.isHeaderWritten()) {
103-
if (!writer.writeHeader(directType()))
104-
return false;
105-
106-
writer.onHeaderWritten();
107-
}
108-
109-
switch (writer.state()) {
110-
case 0:
111-
if (!writer.writeKeyCacheObject(key))
112-
return false;
113-
114-
writer.incrementState();
115-
116-
case 1:
117-
if (!writer.writeCacheObject(val))
118-
return false;
119-
120-
writer.incrementState();
121-
122-
}
123-
124-
return true;
125-
}
126-
127-
/** {@inheritDoc} */
128-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
129-
reader.setBuffer(buf);
130-
131-
switch (reader.state()) {
132-
case 0:
133-
key = reader.readKeyCacheObject();
134-
135-
if (!reader.isLastRead())
136-
return false;
137-
138-
reader.incrementState();
139-
140-
case 1:
141-
val = reader.readCacheObject();
142-
143-
if (!reader.isLastRead())
144-
return false;
145-
146-
reader.incrementState();
147-
148-
}
149-
150-
return true;
151-
}
152-
15398
/** {@inheritDoc} */
15499
@Override public short directType() {
155100
return 95;

modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ public class IgniteDrDataStreamerCacheUpdater implements StreamReceiver<KeyCache
7777
for (Map.Entry<KeyCacheObject, CacheObject> entry0 : col) {
7878
GridCacheRawVersionedEntry entry = (GridCacheRawVersionedEntry)entry0;
7979

80-
entry.unmarshal(cacheObjCtx, ctx.marshaller());
81-
8280
KeyCacheObject key = entry.getKey();
8381

8482
// Ensure that receiver to not receive special-purpose values for TTL and expire time.

0 commit comments

Comments
 (0)