Skip to content

Commit c82ee05

Browse files
authored
Merge pull request #53 from vprus/serialization-streams
Xor: implement stream serialization
2 parents e35ab0d + 42aef91 commit c82ee05

File tree

7 files changed

+209
-6
lines changed

7 files changed

+209
-6
lines changed

fastfilter/src/main/java/org/fastfilter/Filter.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.fastfilter;
22

3+
import java.io.IOException;
4+
import java.io.OutputStream;
35
import java.nio.ByteBuffer;
46

57
/**
@@ -85,4 +87,15 @@ default int getSerializedSize() {
8587
default void serialize(ByteBuffer buffer) {
8688
throw new UnsupportedOperationException();
8789
}
90+
91+
/**
92+
* Serializes the filter state into the provided {@code OutputStream}.
93+
*
94+
* @param out the output stream where the serialized state of the filter will be written
95+
* @throws IOException if writing to the stream fails
96+
* @throws UnsupportedOperationException if the operation is not supported by the filter implementation
97+
*/
98+
default void serialize(OutputStream out) throws IOException {
99+
throw new UnsupportedOperationException();
100+
}
88101
}

fastfilter/src/main/java/org/fastfilter/xor/Xor16.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package org.fastfilter.xor;
22

3+
import java.io.DataInputStream;
4+
import java.io.DataOutputStream;
5+
import java.io.IOException;
6+
import java.io.InputStream;
7+
import java.io.OutputStream;
38
import java.nio.ByteBuffer;
49

510
import org.fastfilter.Filter;
@@ -199,4 +204,27 @@ public static Xor16 deserialize(ByteBuffer buffer) {
199204

200205
return new Xor16(blockLength, bitCount, seed, fingerprints);
201206
}
207+
208+
public void serialize(OutputStream out) throws IOException {
209+
DataOutputStream dout = new DataOutputStream(out);
210+
dout.writeInt(blockLength);
211+
dout.writeLong(seed);
212+
dout.writeInt(fingerprints.length);
213+
for (final short fp : fingerprints) {
214+
dout.writeShort(fp);
215+
}
216+
}
217+
218+
public static Xor16 deserialize(InputStream in) throws IOException {
219+
DataInputStream din = new DataInputStream(in);
220+
final int blockLength = din.readInt();
221+
final long seed = din.readLong();
222+
final int len = din.readInt();
223+
final short[] fingerprints = new short[len];
224+
for (int i = 0; i < len; i++) {
225+
fingerprints[i] = din.readShort();
226+
}
227+
final int bitCount = len * BITS_PER_FINGERPRINT;
228+
return new Xor16(blockLength, bitCount, seed, fingerprints);
229+
}
202230
}

fastfilter/src/main/java/org/fastfilter/xor/Xor8.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,4 +241,22 @@ public static Xor8 deserialize(ByteBuffer buffer) {
241241

242242
return new Xor8(size, seed, fingerprints);
243243
}
244+
245+
public void serialize(OutputStream out) throws IOException {
246+
DataOutputStream dout = new DataOutputStream(out);
247+
dout.writeInt(size);
248+
dout.writeLong(seed);
249+
dout.writeInt(fingerprints.length);
250+
dout.write(fingerprints);
251+
}
252+
253+
public static Xor8 deserialize(InputStream in) throws IOException {
254+
DataInputStream din = new DataInputStream(in);
255+
final int size = din.readInt();
256+
final long seed = din.readLong();
257+
final int len = din.readInt();
258+
final byte[] fingerprints = new byte[len];
259+
din.readFully(fingerprints);
260+
return new Xor8(size, seed, fingerprints);
261+
}
244262
}

fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse16.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package org.fastfilter.xor;
22

3+
import java.io.DataInputStream;
4+
import java.io.DataOutputStream;
5+
import java.io.IOException;
6+
import java.io.InputStream;
7+
import java.io.OutputStream;
38
import java.nio.ByteBuffer;
49
import java.util.Arrays;
510
import org.fastfilter.Filter;
@@ -325,4 +330,29 @@ public static XorBinaryFuse16 deserialize(ByteBuffer buffer) {
325330

326331
return new XorBinaryFuse16(segmentCount, segmentLength, seed, fingerprints);
327332
}
333+
334+
public void serialize(OutputStream out) throws IOException {
335+
DataOutputStream dout = new DataOutputStream(out);
336+
dout.writeInt(segmentLength);
337+
dout.writeInt(segmentCountLength);
338+
dout.writeLong(seed);
339+
dout.writeInt(fingerprints.length);
340+
for (final short fp : fingerprints) {
341+
dout.writeShort(fp);
342+
}
343+
}
344+
345+
public static XorBinaryFuse16 deserialize(InputStream in) throws IOException {
346+
DataInputStream din = new DataInputStream(in);
347+
final int segmentLength = din.readInt();
348+
final int segmentCountLength = din.readInt();
349+
final long seed = din.readLong();
350+
final int len = din.readInt();
351+
final short[] fingerprints = new short[len];
352+
for (int i = 0; i < len; i++) {
353+
fingerprints[i] = din.readShort();
354+
}
355+
final int segmentCount = segmentCountLength / segmentLength;
356+
return new XorBinaryFuse16(segmentCount, segmentLength, seed, fingerprints);
357+
}
328358
}

fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse32.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package org.fastfilter.xor;
22

3+
import java.io.DataInputStream;
4+
import java.io.DataOutputStream;
5+
import java.io.IOException;
6+
import java.io.InputStream;
7+
import java.io.OutputStream;
38
import java.nio.ByteBuffer;
49
import java.util.Arrays;
510

@@ -313,4 +318,29 @@ public static XorBinaryFuse32 deserialize(ByteBuffer buffer) {
313318

314319
return new XorBinaryFuse32(segmentCount, segmentLength, seed, fingerprints);
315320
}
321+
322+
public void serialize(OutputStream out) throws IOException {
323+
DataOutputStream dout = new DataOutputStream(out);
324+
dout.writeInt(segmentLength);
325+
dout.writeInt(segmentCountLength);
326+
dout.writeLong(seed);
327+
dout.writeInt(fingerprints.length);
328+
for (final int fp : fingerprints) {
329+
dout.writeInt(fp);
330+
}
331+
}
332+
333+
public static XorBinaryFuse32 deserialize(InputStream in) throws IOException {
334+
DataInputStream din = new DataInputStream(in);
335+
final int segmentLength = din.readInt();
336+
final int segmentCountLength = din.readInt();
337+
final long seed = din.readLong();
338+
final int len = din.readInt();
339+
final int[] fingerprints = new int[len];
340+
for (int i = 0; i < len; i++) {
341+
fingerprints[i] = din.readInt();
342+
}
343+
final int segmentCount = segmentCountLength / segmentLength;
344+
return new XorBinaryFuse32(segmentCount, segmentLength, seed, fingerprints);
345+
}
316346
}

fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse8.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package org.fastfilter.xor;
22

3+
import java.io.DataInputStream;
4+
import java.io.DataOutputStream;
5+
import java.io.IOException;
6+
import java.io.InputStream;
7+
import java.io.OutputStream;
38
import java.nio.ByteBuffer;
49
import java.util.Arrays;
510

@@ -321,4 +326,25 @@ public static XorBinaryFuse8 deserialize(ByteBuffer buffer) {
321326

322327
return new XorBinaryFuse8(segmentCount, segmentLength, seed, fingerprints);
323328
}
329+
330+
public void serialize(OutputStream out) throws IOException {
331+
DataOutputStream dout = new DataOutputStream(out);
332+
dout.writeInt(segmentLength);
333+
dout.writeInt(segmentCountLength);
334+
dout.writeLong(seed);
335+
dout.writeInt(fingerprints.length);
336+
dout.write(fingerprints);
337+
}
338+
339+
public static XorBinaryFuse8 deserialize(InputStream in) throws IOException {
340+
DataInputStream din = new DataInputStream(in);
341+
final int segmentLength = din.readInt();
342+
final int segmentCountLength = din.readInt();
343+
final long seed = din.readLong();
344+
final int len = din.readInt();
345+
final byte[] fingerprints = new byte[len];
346+
din.readFully(fingerprints);
347+
final int segmentCount = segmentCountLength / segmentLength;
348+
return new XorBinaryFuse8(segmentCount, segmentLength, seed, fingerprints);
349+
}
324350
}

fastfilter/src/test/java/org/fastfilter/xor/SerializationTest.java

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
import static org.junit.Assert.assertTrue;
66
import static org.junit.Assert.fail;
77

8+
import java.io.ByteArrayInputStream;
9+
import java.io.ByteArrayOutputStream;
10+
import java.io.IOException;
11+
import java.io.InputStream;
812
import java.nio.ByteBuffer;
913
import java.util.List;
1014
import java.util.function.Function;
@@ -20,28 +24,36 @@ public class SerializationTest {
2024
private final String filterName;
2125
private final Function<long[], Filter> constructor;
2226
private final Function<ByteBuffer, Filter> deserializer;
27+
private final StreamDeserializer streamDeserializer;
2328

2429
public SerializationTest(String filterName,
2530
Function<long[], Filter> constructor,
26-
Function<ByteBuffer, Filter> deserializer) {
31+
Function<ByteBuffer, Filter> deserializer,
32+
StreamDeserializer streamDeserializer) {
2733
this.filterName = filterName;
2834
this.constructor = constructor;
2935
this.deserializer = deserializer;
36+
this.streamDeserializer = streamDeserializer;
3037
}
3138

3239
@Parameters(name = "{0}")
3340
public static List<Object[]> filters() {
3441
return List.of(
3542
new Object[] {"Xor8", (Function<long[], Filter>) Xor8::construct,
36-
(Function<ByteBuffer, Filter>) Xor8::deserialize},
43+
(Function<ByteBuffer, Filter>) Xor8::deserialize,
44+
(StreamDeserializer) Xor8::deserialize},
3745
new Object[] {"Xor16", (Function<long[], Filter>) Xor16::construct,
38-
(Function<ByteBuffer, Filter>) Xor16::deserialize},
46+
(Function<ByteBuffer, Filter>) Xor16::deserialize,
47+
(StreamDeserializer) Xor16::deserialize},
3948
new Object[] {"XorBinaryFuse8", (Function<long[], Filter>) XorBinaryFuse8::construct,
40-
(Function<ByteBuffer, Filter>) XorBinaryFuse8::deserialize},
49+
(Function<ByteBuffer, Filter>) XorBinaryFuse8::deserialize,
50+
(StreamDeserializer) XorBinaryFuse8::deserialize},
4151
new Object[] {"XorBinaryFuse16", (Function<long[], Filter>) XorBinaryFuse16::construct,
42-
(Function<ByteBuffer, Filter>) XorBinaryFuse16::deserialize},
52+
(Function<ByteBuffer, Filter>) XorBinaryFuse16::deserialize,
53+
(StreamDeserializer) XorBinaryFuse16::deserialize},
4354
new Object[] {"XorBinaryFuse32", (Function<long[], Filter>) XorBinaryFuse32::construct,
44-
(Function<ByteBuffer, Filter>) XorBinaryFuse32::deserialize}
55+
(Function<ByteBuffer, Filter>) XorBinaryFuse32::deserialize,
56+
(StreamDeserializer) XorBinaryFuse32::deserialize}
4557
);
4658
}
4759

@@ -85,6 +97,52 @@ public void shouldSerializeAndDeserializeMediumFilter() {
8597
assertFalse("Key 1500L should not be in " + filterName + " filter", deserializedFilter.mayContain(1500L));
8698
}
8799

100+
@Test
101+
public void shouldSerializeAndDeserializeMediumFilterFromStream() throws IOException {
102+
// Arrange
103+
final var keys = new long[]{100L, 200L, 300L, 400L, 500L, 600L, 700L, 800L, 900L, 1000L};
104+
final var originalFilter = constructor.apply(keys);
105+
final var buffer = ByteBuffer.allocate(originalFilter.getSerializedSize());
106+
107+
// Act
108+
originalFilter.serialize(buffer);
109+
final var input = new ByteArrayInputStream(buffer.array());
110+
final var deserializedFilter = streamDeserializer.deserialize(input);
111+
112+
// Assert
113+
for (final long key : keys) {
114+
assertTrue("Key " + key + " should be present in deserialized " + filterName + " filter",
115+
deserializedFilter.mayContain(key));
116+
}
117+
assertFalse("Key 50L should not be in " + filterName + " filter", deserializedFilter.mayContain(50L));
118+
assertFalse("Key 1500L should not be in " + filterName + " filter", deserializedFilter.mayContain(1500L));
119+
}
120+
121+
@Test
122+
public void shouldSerializeToStreamAndDeserializeFromByteBuffer() throws IOException {
123+
// Arrange
124+
final var keys = new long[]{10L, 20L, 30L, 40L, 50L, 60L, 70L, 80L};
125+
final var originalFilter = constructor.apply(keys);
126+
final var out = new ByteArrayOutputStream();
127+
128+
// Act
129+
originalFilter.serialize(out);
130+
final var buffer = ByteBuffer.wrap(out.toByteArray());
131+
final var deserializedFilter = deserializer.apply(buffer);
132+
133+
// Assert
134+
for (final long key : keys) {
135+
assertTrue("Key " + key + " should be present in deserialized " + filterName + " filter",
136+
deserializedFilter.mayContain(key));
137+
}
138+
assertFalse("Key 15L should not be in " + filterName + " filter", deserializedFilter.mayContain(15L));
139+
}
140+
141+
@FunctionalInterface
142+
private interface StreamDeserializer {
143+
Filter deserialize(InputStream in) throws IOException;
144+
}
145+
88146
@Test
89147
public void shouldSerializeAndDeserializeLargeFilter() {
90148
// Arrange

0 commit comments

Comments
 (0)