Skip to content

Commit 8e06337

Browse files
[FLUSS] Add AggMode support for undo recovery in aggregation tables
This commit introduces AggMode (Aggregation Mode) to control how the server handles data aggregation when writing to tables with aggregation merge engine. Key changes: 1. New AggMode enum with three modes: - AGGREGATE (default): Data is aggregated through server-side merge engine - OVERWRITE: Bypass merge engine, directly replace values (for undo recovery) - LOCAL_AGGREGATE: Reserved for future client-side pre-aggregation 2. Client-side changes: - Upsert interface: Added aggregationMode(AggMode) method for fluent API - UpsertWriterImpl: Propagates aggMode through WriteRecord - KvWriteBatch: Validates aggMode consistency within batch - ClientRpcMessageUtils: Validates aggMode consistency across batches - WriteRecord: Added aggMode field for upsert/delete operations 3. Server-side changes: - KvTablet: Pre-creates overwriteRowMerger for OVERWRITE mode - putAsLeader(): Selects appropriate RowMerger based on aggMode - Replica/ReplicaManager: Propagates aggMode through call chain - TabletService: Extracts aggMode from PutKvRequest 4. Protocol changes: - FlussApi.proto: Added optional agg_mode field to PutKvRequest 5. Test coverage: - KvTabletAggModeTest: 9 tests covering OVERWRITE mode scenarios - KvWriteBatchTest: 3 tests for aggMode consistency validation - ClientRpcMessageUtilsTest: 4 tests for multi-batch aggMode validation This feature enables Flink connector to perform undo recovery by restoring exact historical values during checkpoint failover, bypassing the merge engine.
1 parent 864bbe6 commit 8e06337

18 files changed

Lines changed: 1191 additions & 28 deletions

File tree

fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.client.write.WriterClient;
2121
import org.apache.fluss.metadata.TableInfo;
2222
import org.apache.fluss.metadata.TablePath;
23+
import org.apache.fluss.rpc.protocol.AggMode;
2324
import org.apache.fluss.types.RowType;
2425

2526
import javax.annotation.Nullable;
@@ -32,22 +33,24 @@ public class TableUpsert implements Upsert {
3233
private final TablePath tablePath;
3334
private final TableInfo tableInfo;
3435
private final WriterClient writerClient;
35-
3636
private final @Nullable int[] targetColumns;
37+
private final AggMode aggMode;
3738

3839
public TableUpsert(TablePath tablePath, TableInfo tableInfo, WriterClient writerClient) {
39-
this(tablePath, tableInfo, writerClient, null);
40+
this(tablePath, tableInfo, writerClient, null, AggMode.AGGREGATE);
4041
}
4142

4243
private TableUpsert(
4344
TablePath tablePath,
4445
TableInfo tableInfo,
4546
WriterClient writerClient,
46-
@Nullable int[] targetColumns) {
47+
@Nullable int[] targetColumns,
48+
AggMode aggMode) {
4749
this.tablePath = tablePath;
4850
this.tableInfo = tableInfo;
4951
this.writerClient = writerClient;
5052
this.targetColumns = targetColumns;
53+
this.aggMode = aggMode;
5154
}
5255

5356
@Override
@@ -68,7 +71,7 @@ public Upsert partialUpdate(@Nullable int[] targetColumns) {
6871
}
6972
}
7073
}
71-
return new TableUpsert(tablePath, tableInfo, writerClient, targetColumns);
74+
return new TableUpsert(tablePath, tableInfo, writerClient, targetColumns, this.aggMode);
7275
}
7376

7477
@Override
@@ -91,9 +94,15 @@ public Upsert partialUpdate(String... targetColumnNames) {
9194
return partialUpdate(targetColumns);
9295
}
9396

97+
@Override
98+
public Upsert aggregationMode(AggMode mode) {
99+
checkNotNull(mode, "aggregation mode");
100+
return new TableUpsert(tablePath, tableInfo, writerClient, this.targetColumns, mode);
101+
}
102+
94103
@Override
95104
public UpsertWriter createWriter() {
96-
return new UpsertWriterImpl(tablePath, tableInfo, targetColumns, writerClient);
105+
return new UpsertWriterImpl(tablePath, tableInfo, targetColumns, writerClient, aggMode);
97106
}
98107

99108
@Override

fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.client.table.writer;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.rpc.protocol.AggMode;
2122

2223
import javax.annotation.Nullable;
2324

@@ -26,7 +27,8 @@
2627
* Table.
2728
*
2829
* <p>{@link Upsert} objects are immutable and can be shared between threads. Refinement methods,
29-
* like {@link #partialUpdate}, create new Upsert instances.
30+
* like {@link #partialUpdate(int[])} and {@link #aggregationMode(AggMode)}, create new Upsert
31+
* instances.
3032
*
3133
* @since 0.6
3234
*/
@@ -56,9 +58,44 @@ public interface Upsert {
5658
*/
5759
Upsert partialUpdate(String... targetColumnNames);
5860

61+
/**
62+
* Specify aggregation mode for the UpsertWriter and returns a new Upsert instance.
63+
*
64+
* <p>This method controls how the created UpsertWriter handles data aggregation:
65+
*
66+
* <ul>
67+
* <li>{@link AggMode#AGGREGATE} (default): Data is aggregated through server-side merge
68+
* engine
69+
* <li>{@link AggMode#OVERWRITE}: Data directly overwrites values, bypassing merge engine (for
70+
* undo recovery)
71+
* <li>{@link AggMode#LOCAL_AGGREGATE}: Client-side pre-aggregation before server-side
72+
* aggregation (reserved for future implementation, not yet supported)
73+
* </ul>
74+
*
75+
* <p>Example usage:
76+
*
77+
* <pre>{@code
78+
* // Normal aggregation mode (default)
79+
* UpsertWriter normalWriter = table.newUpsert()
80+
* .aggregationMode(AggMode.AGGREGATE)
81+
* .createWriter();
82+
*
83+
* // Overwrite mode for undo recovery
84+
* UpsertWriter undoWriter = table.newUpsert()
85+
* .aggregationMode(AggMode.OVERWRITE)
86+
* .createWriter();
87+
* }</pre>
88+
*
89+
* @param mode the aggregation mode
90+
* @return a new Upsert instance with the specified aggregation mode
91+
* @since 0.9
92+
*/
93+
Upsert aggregationMode(AggMode mode);
94+
5995
/**
6096
* Create a new {@link UpsertWriter} using {@code InternalRow} with the optional {@link
61-
* #partialUpdate(String...)} information to upsert and delete data to a Primary Key Table.
97+
* #partialUpdate(String...)} and {@link #aggregationMode(AggMode)} information to upsert and
98+
* delete data to a Primary Key Table.
6299
*/
63100
UpsertWriter createWriter();
64101

fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.fluss.row.encode.KeyEncoder;
3232
import org.apache.fluss.row.encode.RowEncoder;
3333
import org.apache.fluss.row.indexed.IndexedRow;
34+
import org.apache.fluss.rpc.protocol.AggMode;
3435
import org.apache.fluss.types.RowType;
3536

3637
import javax.annotation.Nullable;
@@ -56,11 +57,25 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
5657
private final FieldGetter[] fieldGetters;
5758
private final TableInfo tableInfo;
5859

60+
/**
61+
* The aggregation mode for this writer. This controls how the server handles data aggregation.
62+
*/
63+
private final AggMode aggMode;
64+
5965
UpsertWriterImpl(
6066
TablePath tablePath,
6167
TableInfo tableInfo,
6268
@Nullable int[] partialUpdateColumns,
6369
WriterClient writerClient) {
70+
this(tablePath, tableInfo, partialUpdateColumns, writerClient, AggMode.AGGREGATE);
71+
}
72+
73+
UpsertWriterImpl(
74+
TablePath tablePath,
75+
TableInfo tableInfo,
76+
@Nullable int[] partialUpdateColumns,
77+
WriterClient writerClient,
78+
AggMode aggMode) {
6479
super(tablePath, tableInfo, writerClient);
6580
RowType rowType = tableInfo.getRowType();
6681
sanityCheck(
@@ -83,7 +98,16 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
8398
this.writeFormat = WriteFormat.fromKvFormat(this.kvFormat);
8499
this.rowEncoder = RowEncoder.create(kvFormat, rowType);
85100
this.fieldGetters = InternalRow.createFieldGetters(rowType);
101+
86102
this.tableInfo = tableInfo;
103+
104+
// LOCAL_AGGREGATE is reserved for future implementation
105+
if (aggMode == AggMode.LOCAL_AGGREGATE) {
106+
throw new UnsupportedOperationException(
107+
"LOCAL_AGGREGATE mode is not yet supported. "
108+
+ "Please use AGGREGATE or OVERWRITE mode.");
109+
}
110+
this.aggMode = aggMode;
87111
}
88112

89113
private static void sanityCheck(
@@ -168,7 +192,8 @@ public CompletableFuture<UpsertResult> upsert(InternalRow row) {
168192
key,
169193
bucketKey,
170194
writeFormat,
171-
targetColumns);
195+
targetColumns,
196+
aggMode);
172197
return send(record).thenApply(ignored -> UPSERT_SUCCESS);
173198
}
174199

@@ -191,7 +216,8 @@ public CompletableFuture<DeleteResult> delete(InternalRow row) {
191216
key,
192217
bucketKey,
193218
writeFormat,
194-
targetColumns);
219+
targetColumns,
220+
aggMode);
195221
return send(record).thenApply(ignored -> DELETE_SUCCESS);
196222
}
197223

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
7575
import org.apache.fluss.rpc.messages.ProduceLogRequest;
7676
import org.apache.fluss.rpc.messages.PutKvRequest;
77+
import org.apache.fluss.rpc.protocol.AggMode;
7778
import org.apache.fluss.utils.json.DataTypeJsonSerde;
7879
import org.apache.fluss.utils.json.JsonSerdeUtils;
7980

@@ -133,11 +134,12 @@ public static PutKvRequest makePutKvRequest(
133134
.setTimeoutMs(maxRequestTimeoutMs);
134135
// check the target columns in the batch list should be the same. If not same,
135136
// we throw exception directly currently.
136-
int[] targetColumns =
137-
((KvWriteBatch) readyWriteBatches.get(0).writeBatch()).getTargetColumns();
137+
KvWriteBatch firstBatch = (KvWriteBatch) readyWriteBatches.get(0).writeBatch();
138+
int[] targetColumns = firstBatch.getTargetColumns();
139+
AggMode aggMode = firstBatch.getAggMode();
138140
for (int i = 1; i < readyWriteBatches.size(); i++) {
139-
int[] currentBatchTargetColumns =
140-
((KvWriteBatch) readyWriteBatches.get(i).writeBatch()).getTargetColumns();
141+
KvWriteBatch currentBatch = (KvWriteBatch) readyWriteBatches.get(i).writeBatch();
142+
int[] currentBatchTargetColumns = currentBatch.getTargetColumns();
141143
if (!Arrays.equals(targetColumns, currentBatchTargetColumns)) {
142144
throw new IllegalStateException(
143145
String.format(
@@ -146,10 +148,21 @@ public static PutKvRequest makePutKvRequest(
146148
Arrays.toString(targetColumns),
147149
Arrays.toString(currentBatchTargetColumns)));
148150
}
151+
// Validate aggMode consistency across batches
152+
if (currentBatch.getAggMode() != aggMode) {
153+
throw new IllegalStateException(
154+
String.format(
155+
"All the write batches to make put kv request should have the same aggMode, "
156+
+ "but got %s and %s.",
157+
aggMode, currentBatch.getAggMode()));
158+
}
149159
}
150160
if (targetColumns != null) {
151161
request.setTargetColumns(targetColumns);
152162
}
163+
// Set aggMode in the request - this is the proper way to pass aggMode to server
164+
request.setAggMode(aggMode.getProtoValue());
165+
153166
readyWriteBatches.forEach(
154167
readyBatch -> {
155168
TableBucket tableBucket = readyBatch.tableBucket();

fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.fluss.row.BinaryRow;
2929
import org.apache.fluss.row.InternalRow;
3030
import org.apache.fluss.rpc.messages.PutKvRequest;
31+
import org.apache.fluss.rpc.protocol.AggMode;
3132

3233
import javax.annotation.Nullable;
3334
import javax.annotation.concurrent.NotThreadSafe;
@@ -51,6 +52,7 @@ public class KvWriteBatch extends WriteBatch {
5152
private final KvRecordBatchBuilder recordsBuilder;
5253
private final @Nullable int[] targetColumns;
5354
private final int schemaId;
55+
private final AggMode aggMode;
5456

5557
public KvWriteBatch(
5658
int bucketId,
@@ -60,13 +62,15 @@ public KvWriteBatch(
6062
int writeLimit,
6163
AbstractPagedOutputView outputView,
6264
@Nullable int[] targetColumns,
65+
AggMode aggMode,
6366
long createdMs) {
6467
super(bucketId, physicalTablePath, createdMs);
6568
this.outputView = outputView;
6669
this.recordsBuilder =
6770
KvRecordBatchBuilder.builder(schemaId, writeLimit, outputView, kvFormat);
6871
this.targetColumns = targetColumns;
6972
this.schemaId = schemaId;
73+
this.aggMode = aggMode;
7074
}
7175

7276
@Override
@@ -94,6 +98,15 @@ public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws
9498
Arrays.toString(targetColumns)));
9599
}
96100

101+
// Validate aggMode consistency - records with different aggMode cannot be batched together
102+
if (writeRecord.getAggMode() != this.aggMode) {
103+
throw new IllegalStateException(
104+
String.format(
105+
"Cannot mix records with different aggMode in the same batch. "
106+
+ "Batch aggMode: %s, Record aggMode: %s",
107+
this.aggMode, writeRecord.getAggMode()));
108+
}
109+
97110
byte[] key = writeRecord.getKey();
98111
checkNotNull(key, "key must be not null for kv record");
99112
checkNotNull(callback, "write callback must be not null");
@@ -113,6 +126,11 @@ public int[] getTargetColumns() {
113126
return targetColumns;
114127
}
115128

129+
@Override
130+
public AggMode getAggMode() {
131+
return aggMode;
132+
}
133+
116134
@Override
117135
public BytesView build() {
118136
try {
@@ -163,6 +181,7 @@ public void abortRecordAppends() {
163181
recordsBuilder.abort();
164182
}
165183

184+
@Override
166185
public void resetWriterState(long writerId, int batchSequence) {
167186
super.resetWriterState(writerId, batchSequence);
168187
recordsBuilder.resetWriterState(writerId, batchSequence);

fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,7 @@ private WriteBatch createWriteBatch(
617617
outputView.getPreAllocatedSize(),
618618
outputView,
619619
writeRecord.getTargetColumns(),
620+
writeRecord.getAggMode(),
620621
clock.milliseconds());
621622

622623
case ARROW_LOG:

fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.memory.MemorySegmentPool;
2323
import org.apache.fluss.metadata.PhysicalTablePath;
2424
import org.apache.fluss.record.bytesview.BytesView;
25+
import org.apache.fluss.rpc.protocol.AggMode;
2526

2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
@@ -120,6 +121,15 @@ public abstract boolean tryAppend(WriteRecord writeRecord, WriteCallback callbac
120121

121122
public abstract void abortRecordAppends();
122123

124+
/**
125+
* Get the aggregation mode for this batch (only applicable to KV batches).
126+
*
127+
* @return the aggregation mode, defaults to AGGREGATE for log batches
128+
*/
129+
public AggMode getAggMode() {
130+
return AggMode.AGGREGATE;
131+
}
132+
123133
public boolean hasBatchSequence() {
124134
return batchSequence() != NO_BATCH_SEQUENCE;
125135
}

0 commit comments

Comments
 (0)