Skip to content

Commit bfa46ef

Browse files
committed
[client] Support log scanner scan to arrow record batch
1 parent 57cb378 commit bfa46ef

19 files changed

Lines changed: 1419 additions & 458 deletions

File tree

fluss-client/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,20 @@
4949
<version>${project.version}</version>
5050
</dependency>
5151

52+
<dependency>
53+
<groupId>org.apache.arrow</groupId>
54+
<artifactId>arrow-vector</artifactId>
55+
<version>${arrow.version}</version>
56+
<scope>provided</scope>
57+
</dependency>
58+
59+
<dependency>
60+
<groupId>org.apache.arrow</groupId>
61+
<artifactId>arrow-memory-netty</artifactId>
62+
<version>${arrow.version}</version>
63+
<scope>provided</scope>
64+
</dependency>
65+
5266
<!-- test dependency -->
5367
<dependency>
5468
<groupId>org.apache.fluss</groupId>

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
22+
import org.apache.fluss.client.table.scanner.log.ArrowLogScanner;
2223
import org.apache.fluss.client.table.scanner.log.LogScanner;
2324
import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
2425
import org.apache.fluss.metadata.TableBucket;
@@ -74,6 +75,13 @@ public interface Scan {
7475
*/
7576
<T> TypedLogScanner<T> createTypedLogScanner(Class<T> pojoClass);
7677

78+
/**
79+
* Creates an {@link ArrowLogScanner} to continuously read log data as Arrow batches.
80+
*
81+
* <p>Note: this API doesn't support pre-configured with {@link #limit(int)}.
82+
*/
83+
ArrowLogScanner createArrowLogScanner();
84+
7785
/**
7886
* Creates a {@link BatchScanner} to read current data in the given table bucket for this scan.
7987
*

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.fluss.client.table.scanner.batch.CompositeBatchScanner;
2525
import org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScanner;
2626
import org.apache.fluss.client.table.scanner.batch.LimitBatchScanner;
27+
import org.apache.fluss.client.table.scanner.log.ArrowLogScanner;
28+
import org.apache.fluss.client.table.scanner.log.ArrowLogScannerImpl;
2729
import org.apache.fluss.client.table.scanner.log.LogScanner;
2830
import org.apache.fluss.client.table.scanner.log.LogScannerImpl;
2931
import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
@@ -125,6 +127,25 @@ public <T> TypedLogScanner<T> createTypedLogScanner(Class<T> pojoClass) {
125127
return new TypedLogScannerImpl<>(base, pojoClass, tableInfo, projectedColumns);
126128
}
127129

130+
@Override
131+
public ArrowLogScanner createArrowLogScanner() {
132+
if (limit != null) {
133+
throw new UnsupportedOperationException(
134+
String.format(
135+
"ArrowLogScanner doesn't support limit pushdown. Table: %s, requested limit: %d",
136+
tableInfo.getTablePath(), limit));
137+
}
138+
139+
return new ArrowLogScannerImpl(
140+
conn.getConfiguration(),
141+
tableInfo,
142+
conn.getMetadataUpdater(),
143+
conn.getClientMetricGroup(),
144+
conn.getOrCreateRemoteFileDownloader(),
145+
projectedColumns,
146+
schemaGetter);
147+
}
148+
128149
@Override
129150
public BatchScanner createBatchScanner(TableBucket tableBucket) {
130151
if (limit == null) {
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner.log;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.client.metadata.MetadataUpdater;
22+
import org.apache.fluss.config.ConfigOptions;
23+
import org.apache.fluss.config.Configuration;
24+
import org.apache.fluss.exception.AuthorizationException;
25+
import org.apache.fluss.exception.FetchException;
26+
import org.apache.fluss.metadata.TableBucket;
27+
import org.apache.fluss.metadata.TablePath;
28+
import org.apache.fluss.rpc.protocol.ApiError;
29+
import org.apache.fluss.rpc.protocol.Errors;
30+
31+
import org.slf4j.Logger;
32+
33+
import javax.annotation.Nullable;
34+
import javax.annotation.concurrent.ThreadSafe;
35+
36+
import java.util.ArrayList;
37+
import java.util.HashMap;
38+
import java.util.List;
39+
import java.util.Map;
40+
41+
/** Shared implementation for polling completed fetches into scanner results. */
42+
@ThreadSafe
43+
@Internal
44+
abstract class AbstractLogFetchCollector<T, R> {
45+
protected final Logger log;
46+
protected final TablePath tablePath;
47+
protected final LogScannerStatus logScannerStatus;
48+
private final int maxPollRecords;
49+
private final MetadataUpdater metadataUpdater;
50+
51+
protected AbstractLogFetchCollector(
52+
Logger log,
53+
TablePath tablePath,
54+
LogScannerStatus logScannerStatus,
55+
Configuration conf,
56+
MetadataUpdater metadataUpdater) {
57+
this.log = log;
58+
this.tablePath = tablePath;
59+
this.logScannerStatus = logScannerStatus;
60+
this.maxPollRecords = conf.getInt(ConfigOptions.CLIENT_SCANNER_LOG_MAX_POLL_RECORDS);
61+
this.metadataUpdater = metadataUpdater;
62+
}
63+
64+
public R collectFetch(final LogFetchBuffer logFetchBuffer) {
65+
Map<TableBucket, List<T>> fetched = new HashMap<>();
66+
int recordsRemaining = maxPollRecords;
67+
68+
try {
69+
while (recordsRemaining > 0) {
70+
CompletedFetch nextInLineFetch = logFetchBuffer.nextInLineFetch();
71+
if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {
72+
CompletedFetch completedFetch = logFetchBuffer.peek();
73+
if (completedFetch == null) {
74+
break;
75+
}
76+
77+
if (!completedFetch.isInitialized()) {
78+
try {
79+
logFetchBuffer.setNextInLineFetch(initialize(completedFetch));
80+
} catch (Exception e) {
81+
if (fetched.isEmpty() && completedFetch.sizeInBytes == 0) {
82+
logFetchBuffer.poll();
83+
}
84+
throw e;
85+
}
86+
} else {
87+
logFetchBuffer.setNextInLineFetch(completedFetch);
88+
}
89+
90+
logFetchBuffer.poll();
91+
} else {
92+
List<T> records = fetchRecords(nextInLineFetch, recordsRemaining);
93+
if (!records.isEmpty()) {
94+
TableBucket tableBucket = nextInLineFetch.tableBucket;
95+
List<T> currentRecords = fetched.get(tableBucket);
96+
if (currentRecords == null) {
97+
fetched.put(tableBucket, records);
98+
} else {
99+
List<T> mergedRecords =
100+
new ArrayList<>(records.size() + currentRecords.size());
101+
mergedRecords.addAll(currentRecords);
102+
mergedRecords.addAll(records);
103+
fetched.put(tableBucket, mergedRecords);
104+
}
105+
106+
recordsRemaining -= recordCount(records);
107+
}
108+
}
109+
}
110+
} catch (FetchException e) {
111+
if (fetched.isEmpty()) {
112+
throw e;
113+
}
114+
}
115+
116+
return toResult(fetched);
117+
}
118+
119+
@Nullable
120+
CompletedFetch initialize(CompletedFetch completedFetch) {
121+
TableBucket tb = completedFetch.tableBucket;
122+
ApiError error = completedFetch.error;
123+
124+
try {
125+
if (error.isSuccess()) {
126+
return handleInitializeSuccess(completedFetch);
127+
} else {
128+
handleInitializeErrors(completedFetch, error.error(), error.messageWithFallback());
129+
return null;
130+
}
131+
} finally {
132+
if (error.isFailure()) {
133+
logScannerStatus.moveBucketToEnd(tb);
134+
}
135+
}
136+
}
137+
138+
private @Nullable CompletedFetch handleInitializeSuccess(CompletedFetch completedFetch) {
139+
TableBucket tb = completedFetch.tableBucket;
140+
long fetchOffset = completedFetch.nextFetchOffset();
141+
142+
Long offset = logScannerStatus.getBucketOffset(tb);
143+
if (offset == null) {
144+
log.debug(
145+
"Discarding stale fetch response for bucket {} since the expected offset is null which means the bucket has been unsubscribed.",
146+
tb);
147+
return null;
148+
}
149+
if (offset != fetchOffset) {
150+
log.warn(
151+
"Discarding stale fetch response for bucket {} since its offset {} does not match the expected offset {}.",
152+
tb,
153+
fetchOffset,
154+
offset);
155+
return null;
156+
}
157+
158+
long highWatermark = completedFetch.highWatermark;
159+
if (highWatermark >= 0) {
160+
log.trace("Updating high watermark for bucket {} to {}.", tb, highWatermark);
161+
logScannerStatus.updateHighWatermark(tb, highWatermark);
162+
}
163+
164+
completedFetch.setInitialized();
165+
return completedFetch;
166+
}
167+
168+
private void handleInitializeErrors(
169+
CompletedFetch completedFetch, Errors error, String errorMessage) {
170+
TableBucket tb = completedFetch.tableBucket;
171+
long fetchOffset = completedFetch.nextFetchOffset();
172+
if (error == Errors.NOT_LEADER_OR_FOLLOWER
173+
|| error == Errors.LOG_STORAGE_EXCEPTION
174+
|| error == Errors.KV_STORAGE_EXCEPTION
175+
|| error == Errors.STORAGE_EXCEPTION
176+
|| error == Errors.FENCED_LEADER_EPOCH_EXCEPTION) {
177+
log.debug(
178+
"Error in fetch for bucket {}: {}:{}",
179+
tb,
180+
error.exceptionName(),
181+
error.exception(errorMessage));
182+
metadataUpdater.checkAndUpdateMetadata(tablePath, tb);
183+
} else if (error == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION) {
184+
log.warn("Received unknown table or bucket error in fetch for bucket {}", tb);
185+
metadataUpdater.checkAndUpdateMetadata(tablePath, tb);
186+
} else if (error == Errors.LOG_OFFSET_OUT_OF_RANGE_EXCEPTION) {
187+
throw new FetchException(
188+
String.format(
189+
"The fetching offset %s is out of range: %s",
190+
fetchOffset, error.exception(errorMessage)));
191+
} else if (error == Errors.AUTHORIZATION_EXCEPTION) {
192+
throw new AuthorizationException(errorMessage);
193+
} else if (error == Errors.UNKNOWN_SERVER_ERROR) {
194+
log.warn(
195+
"Unknown server error while fetching offset {} for bucket {}: {}",
196+
fetchOffset,
197+
tb,
198+
error.exception(errorMessage));
199+
} else if (error == Errors.CORRUPT_MESSAGE) {
200+
throw new FetchException(
201+
String.format(
202+
"Encountered corrupt message when fetching offset %s for bucket %s: %s",
203+
fetchOffset, tb, error.exception(errorMessage)));
204+
} else {
205+
throw new FetchException(
206+
String.format(
207+
"Unexpected error code %s while fetching at offset %s from bucket %s: %s",
208+
error, fetchOffset, tb, error.exception(errorMessage)));
209+
}
210+
}
211+
212+
protected abstract List<T> fetchRecords(CompletedFetch nextInLineFetch, int maxRecords);
213+
214+
protected abstract int recordCount(List<T> fetchedRecords);
215+
216+
protected abstract R toResult(Map<TableBucket, List<T>> fetchedRecords);
217+
}

0 commit comments

Comments
 (0)