Skip to content

Commit 30c31a5

Browse files
AVRO-4165: [java] ability to specify AvroEncode on a class (#3425)
1 parent f0d2bd4 commit 30c31a5

7 files changed

Lines changed: 262 additions & 5 deletions

File tree

lang/java/avro/src/main/java/org/apache/avro/reflect/AvroEncode.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.avro.reflect;
1919

2020
import java.lang.annotation.ElementType;
21+
import java.lang.annotation.Inherited;
2122
import java.lang.annotation.Retention;
2223
import java.lang.annotation.RetentionPolicy;
2324
import java.lang.annotation.Target;
@@ -30,7 +31,8 @@
3031
* file. Use of {@link org.apache.avro.io.ValidatingEncoder} is recommended.
3132
*/
3233
@Retention(RetentionPolicy.RUNTIME)
33-
@Target(ElementType.FIELD)
34+
@Inherited
35+
@Target({ ElementType.FIELD, ElementType.TYPE })
3436
public @interface AvroEncode {
3537
Class<? extends CustomEncoding<?>> using();
3638
}

lang/java/avro/src/main/java/org/apache/avro/reflect/FieldAccessReflect.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class FieldAccessReflect extends FieldAccess {
2828

2929
@Override
3030
protected FieldAccessor getAccessor(Field field) {
31-
AvroEncode enc = field.getAnnotation(AvroEncode.class);
31+
AvroEncode enc = ReflectionUtil.getAvroEncode(field);
3232
if (enc != null)
3333
try {
3434
return new ReflectionBasesAccessorCustomEncoded(field, enc.using().getDeclaredConstructor().newInstance());
@@ -47,7 +47,7 @@ public ReflectionBasedAccessor(Field field) {
4747
this.field = field;
4848
this.field.setAccessible(true);
4949
isStringable = field.isAnnotationPresent(Stringable.class);
50-
isCustomEncoded = field.isAnnotationPresent(AvroEncode.class);
50+
isCustomEncoded = ReflectionUtil.getAvroEncode(field) != null;
5151
}
5252

5353
@Override

lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ public class ReflectData extends SpecificData {
6969

7070
private static final String STRING_OUTER_PARENT_REFERENCE = "this$0";
7171

72+
// holds a wrapper so null entries will have a cached value
73+
private final ConcurrentMap<Schema, CustomEncodingWrapper> encoderCache = new ConcurrentHashMap<>();
74+
7275
/**
7376
* Always false since custom coders are not available for {@link ReflectData}.
7477
*/
@@ -864,7 +867,7 @@ private static Field[] getFields(Class<?> recordClass, boolean excludeJava) {
864867

865868
/** Create a schema for a field. */
866869
protected Schema createFieldSchema(Field field, Map<String, Schema> names) {
867-
AvroEncode enc = field.getAnnotation(AvroEncode.class);
870+
AvroEncode enc = ReflectionUtil.getAvroEncode(field);
868871
if (enc != null)
869872
try {
870873
return enc.using().getDeclaredConstructor().newInstance().getSchema();
@@ -1042,4 +1045,36 @@ public Object newRecord(Object old, Schema schema) {
10421045
}
10431046
return super.newRecord(old, schema);
10441047
}
1048+
1049+
public CustomEncoding getCustomEncoding(Schema schema) {
1050+
1051+
return this.encoderCache.computeIfAbsent(schema, this::populateEncoderCache).get();
1052+
}
1053+
1054+
private CustomEncodingWrapper populateEncoderCache(Schema schema) {
1055+
var enc = ReflectionUtil.getAvroEncode(getClass(schema));
1056+
if (enc != null) {
1057+
try {
1058+
return new CustomEncodingWrapper(enc.using().getDeclaredConstructor().newInstance());
1059+
} catch (Exception e) {
1060+
throw new AvroRuntimeException("Could not instantiate custom Encoding");
1061+
}
1062+
}
1063+
return new CustomEncodingWrapper(null);
1064+
}
1065+
1066+
private static class CustomEncodingWrapper {
1067+
1068+
private final CustomEncoding customEncoding;
1069+
1070+
private CustomEncodingWrapper(CustomEncoding customEncoding) {
1071+
this.customEncoding = customEncoding;
1072+
}
1073+
1074+
public CustomEncoding get() {
1075+
return customEncoding;
1076+
}
1077+
1078+
}
1079+
10451080
}

lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ public ReflectDatumReader(ReflectData data) {
7373
super(data);
7474
}
7575

76+
private ReflectData getReflectData() {
77+
return (ReflectData) getSpecificData();
78+
}
79+
7680
@Override
7781
protected Object newArray(Object old, int size, Schema schema) {
7882
Class<?> collectionClass = ReflectData.getClassProp(schema, SpecificData.CLASS_PROP);
@@ -251,6 +255,16 @@ protected Object readBytes(Object old, Schema s, Decoder in) throws IOException
251255
}
252256
}
253257

258+
@Override
259+
protected Object read(Object old, Schema expected, ResolvingDecoder in) throws IOException {
260+
CustomEncoding encoder = getReflectData().getCustomEncoding(expected);
261+
if (encoder != null) {
262+
return encoder.read(old, in);
263+
} else {
264+
return super.read(old, expected, in);
265+
}
266+
}
267+
254268
@Override
255269
protected Object readInt(Object old, Schema expected, Decoder in) throws IOException {
256270
Object value = in.readInt();

lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumWriter.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ protected ReflectDatumWriter(ReflectData reflectData) {
6161
super(reflectData);
6262
}
6363

64+
private ReflectData getReflectData() {
65+
return (ReflectData) getSpecificData();
66+
}
67+
6468
/**
6569
* Called to write a array. May be overridden for alternate array
6670
* representations.
@@ -158,7 +162,13 @@ else if (datum instanceof Map && ReflectData.isNonStringMapSchema(schema)) {
158162
datum = ((Optional) datum).orElse(null);
159163
}
160164
try {
161-
super.write(schema, datum, out);
165+
166+
CustomEncoding encoder = getReflectData().getCustomEncoding(schema);
167+
if (encoder != null) {
168+
encoder.write(datum, out);
169+
} else {
170+
super.write(schema, datum, out);
171+
}
162172
} catch (NullPointerException e) { // improve error message
163173
throw npe(e, " in " + schema.getFullName());
164174
}

lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectionUtil.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.lang.invoke.MethodHandle;
2525
import java.lang.invoke.MethodHandles;
2626
import java.lang.invoke.MethodType;
27+
import java.lang.reflect.Field;
2728
import java.lang.reflect.ParameterizedType;
2829
import java.lang.reflect.Type;
2930
import java.lang.reflect.TypeVariable;
@@ -188,4 +189,19 @@ public static <V, R> Function<V, R> getConstructorAsFunction(Class<V> parameterC
188189
}
189190
}
190191

192+
protected static AvroEncode getAvroEncode(Field field) {
193+
var enc = field.getAnnotation(AvroEncode.class);
194+
if (enc != null) {
195+
return enc;
196+
} else {
197+
return getAvroEncode(field.getType());
198+
}
199+
}
200+
201+
protected static AvroEncode getAvroEncode(Class<?> clazz) {
202+
if (clazz == null) {
203+
return null;
204+
}
205+
return clazz.getAnnotation(AvroEncode.class);
206+
}
191207
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.avro.reflect;
19+
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
22+
import java.io.ByteArrayOutputStream;
23+
import java.io.IOException;
24+
import java.util.Arrays;
25+
26+
import org.apache.avro.AvroTypeException;
27+
import org.apache.avro.Schema;
28+
import org.apache.avro.io.Decoder;
29+
import org.apache.avro.io.DecoderFactory;
30+
import org.apache.avro.io.Encoder;
31+
import org.apache.avro.io.EncoderFactory;
32+
import org.junit.jupiter.api.Test;
33+
34+
public class TestAvroEncode {
35+
EncoderFactory factory = new EncoderFactory();
36+
37+
@Test
38+
void testWithinClass() throws IOException {
39+
40+
var wrapper = new Wrapper(new R1("test"));
41+
42+
var read = readWrite(wrapper);
43+
44+
assertEquals("test", wrapper.getR1().getValue());
45+
assertEquals("test used this", read.getR1().getValue());
46+
}
47+
48+
@Test
49+
void testDirect() throws IOException {
50+
51+
var r1 = new R1("test");
52+
53+
var read = readWrite(r1);
54+
55+
assertEquals("test", r1.getValue());
56+
assertEquals("test used this", read.getValue());
57+
}
58+
59+
@Test
60+
void testFieldAnnotationTakesPrecedence() throws IOException {
61+
62+
var wrapper = new OtherWrapper(new R1("test"));
63+
64+
var read = readWrite(wrapper);
65+
66+
assertEquals("test", wrapper.getR1().getValue());
67+
assertEquals("test used other", read.getR1().getValue());
68+
}
69+
70+
public static class Wrapper {
71+
72+
private R1 r1;
73+
74+
public Wrapper() {
75+
}
76+
77+
public Wrapper(R1 r1) {
78+
this.r1 = r1;
79+
}
80+
81+
public R1 getR1() {
82+
return r1;
83+
}
84+
85+
public void setR1(R1 r1) {
86+
this.r1 = r1;
87+
}
88+
89+
}
90+
91+
public static class OtherWrapper {
92+
@AvroEncode(using = R1EncodingOther.class)
93+
private R1 r1;
94+
95+
public OtherWrapper() {
96+
}
97+
98+
public OtherWrapper(R1 r1) {
99+
this.r1 = r1;
100+
}
101+
102+
public R1 getR1() {
103+
return r1;
104+
}
105+
106+
public void setR1(R1 r1) {
107+
this.r1 = r1;
108+
}
109+
110+
}
111+
112+
@AvroEncode(using = R1Encoding.class)
113+
public static class R1 {
114+
115+
private final String value;
116+
117+
public R1(String value) {
118+
this.value = value;
119+
}
120+
121+
public String getValue() {
122+
return value;
123+
}
124+
125+
}
126+
127+
public static class R1Encoding extends CustomEncoding<R1> {
128+
129+
{
130+
schema = Schema.createRecord("R1", null, null, false,
131+
Arrays.asList(new Schema.Field("value", Schema.create(Schema.Type.STRING), null, null)));
132+
}
133+
134+
@Override
135+
protected void write(Object datum, Encoder out) throws IOException {
136+
if (datum instanceof R1) {
137+
out.writeString(((R1) datum).getValue());
138+
} else {
139+
throw new AvroTypeException("Expected R1, got " + datum.getClass());
140+
}
141+
142+
}
143+
144+
@Override
145+
protected R1 read(Object reuse, Decoder in) throws IOException {
146+
return new R1(in.readString() + " used this");
147+
}
148+
}
149+
150+
public static class R1EncodingOther extends CustomEncoding<R1> {
151+
152+
{
153+
schema = Schema.createRecord("R1", null, null, false,
154+
Arrays.asList(new Schema.Field("value", Schema.create(Schema.Type.STRING), null, null)));
155+
}
156+
157+
@Override
158+
protected void write(Object datum, Encoder out) throws IOException {
159+
if (datum instanceof R1) {
160+
out.writeString(((R1) datum).getValue());
161+
} else {
162+
throw new AvroTypeException("Expected R1, got " + datum.getClass());
163+
}
164+
}
165+
166+
@Override
167+
protected R1 read(Object reuse, Decoder in) throws IOException {
168+
return new R1(in.readString() + " used other");
169+
}
170+
}
171+
172+
<T> T readWrite(T object) throws IOException {
173+
var schema = new ReflectData().getSchema(object.getClass());
174+
ReflectDatumWriter<T> writer = new ReflectDatumWriter<>(schema);
175+
ByteArrayOutputStream out = new ByteArrayOutputStream();
176+
writer.write(object, factory.directBinaryEncoder(out, null));
177+
ReflectDatumReader<T> reader = new ReflectDatumReader<>(schema);
178+
return reader.read(null, DecoderFactory.get().binaryDecoder(out.toByteArray(), null));
179+
}
180+
}

0 commit comments

Comments
 (0)