Skip to content

Commit 3ce85db

Browse files
committed
[FLINK-39184][core] Introduce Bitmap type for DataStream API
1 parent 5bf3509 commit 3ce85db

13 files changed

Lines changed: 1067 additions & 0 deletions

File tree

flink-core/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,13 @@ under the License.
145145
<artifactId>flink-shaded-guava</artifactId>
146146
</dependency>
147147

148+
<!-- Bitmap internal implementation -->
149+
<dependency>
150+
<groupId>org.roaringbitmap</groupId>
151+
<artifactId>RoaringBitmap</artifactId>
152+
<version>1.3.0</version>
153+
</dependency>
154+
148155
<!-- ================== test dependencies ================== -->
149156

150157
<dependency>
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.common.typeinfo;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.api.common.serialization.SerializerConfig;
23+
import org.apache.flink.api.common.typeutils.TypeSerializer;
24+
import org.apache.flink.api.common.typeutils.base.BitmapSerializer;
25+
import org.apache.flink.types.bitmap.Bitmap;
26+
27+
/** Type information for {@link Bitmap}. */
28+
@PublicEvolving
29+
public class BitmapTypeInfo extends TypeInformation<Bitmap> {
30+
31+
private static final long serialVersionUID = 1L;
32+
33+
public static final BitmapTypeInfo INSTANCE = new BitmapTypeInfo();
34+
35+
private BitmapTypeInfo() {}
36+
37+
@Override
38+
public boolean isBasicType() {
39+
return false;
40+
}
41+
42+
@Override
43+
public boolean isTupleType() {
44+
return false;
45+
}
46+
47+
@Override
48+
public int getArity() {
49+
return 1;
50+
}
51+
52+
@Override
53+
public int getTotalFields() {
54+
return 1;
55+
}
56+
57+
@Override
58+
public Class<Bitmap> getTypeClass() {
59+
return Bitmap.class;
60+
}
61+
62+
@Override
63+
public boolean isKeyType() {
64+
return true;
65+
}
66+
67+
@Override
68+
public boolean isSortKeyType() {
69+
return false;
70+
}
71+
72+
@Override
73+
public TypeSerializer<Bitmap> createSerializer(SerializerConfig config) {
74+
return BitmapSerializer.INSTANCE;
75+
}
76+
77+
@Override
78+
public String toString() {
79+
return Bitmap.class.getSimpleName();
80+
}
81+
82+
@Override
83+
public boolean equals(Object obj) {
84+
if (obj instanceof BitmapTypeInfo) {
85+
BitmapTypeInfo other = (BitmapTypeInfo) obj;
86+
return other.canEqual(this);
87+
} else {
88+
return false;
89+
}
90+
}
91+
92+
@Override
93+
public int hashCode() {
94+
return BitmapTypeInfo.class.hashCode();
95+
}
96+
97+
@Override
98+
public boolean canEqual(Object obj) {
99+
return obj instanceof BitmapTypeInfo;
100+
}
101+
}

flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.flink.types.Either;
3838
import org.apache.flink.types.Row;
3939
import org.apache.flink.types.Value;
40+
import org.apache.flink.types.bitmap.Bitmap;
4041
import org.apache.flink.types.variant.Variant;
4142

4243
import java.lang.reflect.Field;
@@ -159,6 +160,12 @@ public class Types {
159160

160161
public static final TypeInformation<Variant> VARIANT = VariantTypeInfo.INSTANCE;
161162

163+
/**
164+
* Returns type information for {@link org.apache.flink.types.bitmap.Bitmap}. Supports a null
165+
* value.
166+
*/
167+
public static final TypeInformation<Bitmap> BITMAP = BitmapTypeInfo.INSTANCE;
168+
162169
// CHECKSTYLE.OFF: MethodName
163170

164171
/**
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.common.typeutils.base;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
23+
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
24+
import org.apache.flink.core.memory.DataInputView;
25+
import org.apache.flink.core.memory.DataOutputView;
26+
import org.apache.flink.types.bitmap.Bitmap;
27+
28+
import java.io.IOException;
29+
30+
/** Serializer for {@link Bitmap}. */
31+
@Internal
32+
public class BitmapSerializer extends TypeSerializerSingleton<Bitmap> {
33+
34+
public static final BitmapSerializer INSTANCE = new BitmapSerializer();
35+
36+
@Override
37+
public boolean isImmutableType() {
38+
return false;
39+
}
40+
41+
@Override
42+
public Bitmap createInstance() {
43+
return Bitmap.empty();
44+
}
45+
46+
@Override
47+
public Bitmap copy(Bitmap from) {
48+
return Bitmap.from(from);
49+
}
50+
51+
@Override
52+
public Bitmap copy(Bitmap from, Bitmap reuse) {
53+
return copy(from);
54+
}
55+
56+
@Override
57+
public int getLength() {
58+
return -1;
59+
}
60+
61+
@Override
62+
public void serialize(Bitmap record, DataOutputView target) throws IOException {
63+
byte[] bytes = record.toBytes();
64+
target.writeInt(bytes.length);
65+
target.write(bytes);
66+
}
67+
68+
@Override
69+
public Bitmap deserialize(DataInputView source) throws IOException {
70+
int length = source.readInt();
71+
byte[] bytes = new byte[length];
72+
source.read(bytes);
73+
return Bitmap.fromBytes(bytes);
74+
}
75+
76+
@Override
77+
public Bitmap deserialize(Bitmap reuse, DataInputView source) throws IOException {
78+
return this.deserialize(source);
79+
}
80+
81+
@Override
82+
public void copy(DataInputView source, DataOutputView target) throws IOException {
83+
int length = source.readInt();
84+
target.writeInt(length);
85+
target.write(source, length);
86+
}
87+
88+
@Override
89+
public TypeSerializerSnapshot<Bitmap> snapshotConfiguration() {
90+
return new BitmapSerializerSnapshot();
91+
}
92+
93+
@Internal
94+
public static final class BitmapSerializerSnapshot
95+
extends SimpleTypeSerializerSnapshot<Bitmap> {
96+
/** Constructor to create snapshot from serializer (writing the snapshot). */
97+
public BitmapSerializerSnapshot() {
98+
super(() -> INSTANCE);
99+
}
100+
}
101+
}

flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.flink.api.common.io.InputFormat;
3838
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
3939
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
40+
import org.apache.flink.api.common.typeinfo.BitmapTypeInfo;
4041
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
4142
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
4243
import org.apache.flink.api.common.typeinfo.TypeInfo;
@@ -50,6 +51,7 @@
5051
import org.apache.flink.api.java.typeutils.TypeExtractionUtils.LambdaExecutable;
5152
import org.apache.flink.types.Row;
5253
import org.apache.flink.types.Value;
54+
import org.apache.flink.types.bitmap.Bitmap;
5355
import org.apache.flink.types.variant.Variant;
5456
import org.apache.flink.util.InstantiationUtil;
5557
import org.apache.flink.util.Preconditions;
@@ -1977,6 +1979,11 @@ private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass(
19771979
return (TypeInformation<OUT>) VariantTypeInfo.INSTANCE;
19781980
}
19791981

1982+
// check for Bitmap
1983+
if (Bitmap.class.isAssignableFrom(clazz)) {
1984+
return (TypeInformation<OUT>) BitmapTypeInfo.INSTANCE;
1985+
}
1986+
19801987
// check for parameterized Collections, requirement:
19811988
// 1. Interface types: the underlying implementation types are not preserved across
19821989
// serialization

0 commit comments

Comments
 (0)