Skip to content

Commit 4890e21

Browse files
committed
feat: add postpone bucket (bucket=-2) write support for primary-key tables
Postpone bucket mode writes data in KV format without sorting or deduplication, deferring bucket assignment to background compaction. Files are written to `bucket-postpone` directory and are invisible to normal reads until compacted.
1 parent aa19a2c commit 4890e21

9 files changed

Lines changed: 814 additions & 69 deletions

File tree

crates/integrations/datafusion/tests/pk_tables.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,3 +1259,124 @@ async fn test_pk_first_row_insert_overwrite() {
12591259
"After second OVERWRITE: still 2 files (no stale level-0 files accumulated)"
12601260
);
12611261
}
1262+
1263+
// ======================= Postpone Bucket (bucket = -2) =======================
1264+
1265+
/// Postpone bucket files are invisible to normal SELECT but visible via scan_all_files.
1266+
#[tokio::test]
1267+
async fn test_postpone_write_invisible_to_select() {
1268+
let (_tmp, catalog) = create_test_env();
1269+
let handler = create_handler(catalog.clone());
1270+
handler
1271+
.sql("CREATE SCHEMA paimon.test_db")
1272+
.await
1273+
.expect("CREATE SCHEMA failed");
1274+
1275+
handler
1276+
.sql(
1277+
"CREATE TABLE paimon.test_db.t_postpone (
1278+
id INT NOT NULL, value INT,
1279+
PRIMARY KEY (id)
1280+
) WITH ('bucket' = '-2')",
1281+
)
1282+
.await
1283+
.unwrap();
1284+
1285+
// Write data
1286+
handler
1287+
.sql("INSERT INTO paimon.test_db.t_postpone VALUES (1, 10), (2, 20), (3, 30)")
1288+
.await
1289+
.unwrap()
1290+
.collect()
1291+
.await
1292+
.unwrap();
1293+
1294+
// scan_all_files should find the postpone file
1295+
let table = catalog
1296+
.get_table(&Identifier::new("test_db", "t_postpone"))
1297+
.await
1298+
.unwrap();
1299+
let plan = table
1300+
.new_read_builder()
1301+
.new_scan()
1302+
.with_scan_all_files()
1303+
.plan()
1304+
.await
1305+
.unwrap();
1306+
let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum();
1307+
assert_eq!(file_count, 1, "scan_all_files should find 1 postpone file");
1308+
1309+
// Normal SELECT should return 0 rows (postpone files are invisible)
1310+
let count = row_count(&handler, "SELECT * FROM paimon.test_db.t_postpone").await;
1311+
assert_eq!(count, 0, "SELECT should return 0 rows for postpone table");
1312+
}
1313+
1314+
/// INSERT OVERWRITE on a postpone table should replace old files with new ones.
1315+
#[tokio::test]
1316+
async fn test_postpone_insert_overwrite() {
1317+
let (_tmp, catalog) = create_test_env();
1318+
let handler = create_handler(catalog.clone());
1319+
handler
1320+
.sql("CREATE SCHEMA paimon.test_db")
1321+
.await
1322+
.expect("CREATE SCHEMA failed");
1323+
1324+
handler
1325+
.sql(
1326+
"CREATE TABLE paimon.test_db.t_postpone_ow (
1327+
id INT NOT NULL, value INT,
1328+
PRIMARY KEY (id)
1329+
) WITH ('bucket' = '-2')",
1330+
)
1331+
.await
1332+
.unwrap();
1333+
1334+
// First commit
1335+
handler
1336+
.sql("INSERT INTO paimon.test_db.t_postpone_ow VALUES (1, 10), (2, 20)")
1337+
.await
1338+
.unwrap()
1339+
.collect()
1340+
.await
1341+
.unwrap();
1342+
1343+
let table = catalog
1344+
.get_table(&Identifier::new("test_db", "t_postpone_ow"))
1345+
.await
1346+
.unwrap();
1347+
let plan = table
1348+
.new_read_builder()
1349+
.new_scan()
1350+
.with_scan_all_files()
1351+
.plan()
1352+
.await
1353+
.unwrap();
1354+
let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum();
1355+
assert_eq!(file_count, 1, "After INSERT: 1 postpone file");
1356+
1357+
// INSERT OVERWRITE should replace old file
1358+
handler
1359+
.sql("INSERT OVERWRITE paimon.test_db.t_postpone_ow VALUES (3, 30)")
1360+
.await
1361+
.unwrap()
1362+
.collect()
1363+
.await
1364+
.unwrap();
1365+
1366+
let table = catalog
1367+
.get_table(&Identifier::new("test_db", "t_postpone_ow"))
1368+
.await
1369+
.unwrap();
1370+
let plan = table
1371+
.new_read_builder()
1372+
.new_scan()
1373+
.with_scan_all_files()
1374+
.plan()
1375+
.await
1376+
.unwrap();
1377+
let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum();
1378+
assert_eq!(
1379+
file_count, 1,
1380+
"After OVERWRITE: only 1 new file (old file deleted)"
1381+
);
1382+
}

crates/paimon/src/spec/core_options.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ const BUCKET_KEY_OPTION: &str = "bucket-key";
2828
const BUCKET_FUNCTION_TYPE_OPTION: &str = "bucket-function.type";
2929
const BUCKET_OPTION: &str = "bucket";
3030
const DEFAULT_BUCKET: i32 = -1;
31+
/// Postpone bucket mode: data is written to `bucket-postpone` directory
32+
/// and is invisible to readers until compaction assigns real bucket numbers.
33+
pub const POSTPONE_BUCKET: i32 = -2;
34+
/// Directory name for postpone bucket files.
35+
pub const POSTPONE_BUCKET_DIR: &str = "bucket-postpone";
3136
const COMMIT_MAX_RETRIES_OPTION: &str = "commit.max-retries";
3237
const COMMIT_TIMEOUT_OPTION: &str = "commit.timeout";
3338
const COMMIT_MIN_RETRY_WAIT_OPTION: &str = "commit.min-retry-wait";
@@ -63,6 +68,16 @@ pub enum MergeEngine {
6368
FirstRow,
6469
}
6570

71+
/// Format the bucket directory name for a given bucket number.
72+
/// Returns `"bucket-postpone"` for `POSTPONE_BUCKET` (-2), otherwise `"bucket-{N}"`.
73+
pub fn bucket_dir_name(bucket: i32) -> String {
74+
if bucket == POSTPONE_BUCKET {
75+
POSTPONE_BUCKET_DIR.to_string()
76+
} else {
77+
format!("bucket-{bucket}")
78+
}
79+
}
80+
6681
/// Typed accessors for common table options.
6782
///
6883
/// This mirrors pypaimon's `CoreOptions` pattern while staying lightweight.

crates/paimon/src/table/data_file_writer.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
use crate::arrow::format::{create_format_writer, FormatFileWriter};
2626
use crate::io::FileIO;
2727
use crate::spec::stats::BinaryTableStats;
28-
use crate::spec::{DataFileMeta, EMPTY_SERIALIZED_ROW};
28+
use crate::spec::{bucket_dir_name, DataFileMeta, EMPTY_SERIALIZED_ROW};
2929
use crate::Result;
3030
use arrow_array::RecordBatch;
3131
use chrono::Utc;
@@ -133,11 +133,13 @@ impl DataFileWriter {
133133
);
134134

135135
let bucket_dir = if self.partition_path.is_empty() {
136-
format!("{}/bucket-{}", self.table_location, self.bucket)
136+
format!("{}/{}", self.table_location, bucket_dir_name(self.bucket))
137137
} else {
138138
format!(
139-
"{}/{}/bucket-{}",
140-
self.table_location, self.partition_path, self.bucket
139+
"{}/{}/{}",
140+
self.table_location,
141+
self.partition_path,
142+
bucket_dir_name(self.bucket)
141143
)
142144
};
143145
self.file_io.mkdirs(&format!("{bucket_dir}/")).await?;

crates/paimon/src/table/kv_file_writer.rs

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,6 @@ pub(crate) struct KeyValueWriteConfig {
7474
pub sequence_field_indices: Vec<usize>,
7575
/// Merge engine for deduplication.
7676
pub merge_engine: MergeEngine,
77-
/// Column index in user schema that provides the row kind value.
78-
/// Resolved from: `rowkind.field` option > `_VALUE_KIND` column > None (all INSERT).
79-
pub value_kind_col_index: Option<usize>,
8077
}
8178

8279
impl KeyValueFileWriter {
@@ -200,23 +197,8 @@ impl KeyValueFileWriter {
200197
let min_key = self.extract_key_binary_row(&combined, first_row)?;
201198
let max_key = self.extract_key_binary_row(&combined, last_row)?;
202199

203-
// Build physical schema (thin-mode): [_SEQUENCE_NUMBER, _VALUE_KIND, all_user_cols...]
204-
let user_fields = user_schema.fields();
205-
let mut physical_fields: Vec<Arc<ArrowField>> = Vec::new();
206-
physical_fields.push(Arc::new(ArrowField::new(
207-
SEQUENCE_NUMBER_FIELD_NAME,
208-
ArrowDataType::Int64,
209-
false,
210-
)));
211-
physical_fields.push(Arc::new(ArrowField::new(
212-
VALUE_KIND_FIELD_NAME,
213-
ArrowDataType::Int8,
214-
false,
215-
)));
216-
for field in user_fields.iter() {
217-
physical_fields.push(field.clone());
218-
}
219-
let physical_schema = Arc::new(ArrowSchema::new(physical_fields));
200+
// Build physical schema and open writer.
201+
let physical_schema = build_physical_schema(&user_schema);
220202

221203
// Open parquet writer.
222204
let file_name = format!(
@@ -262,8 +244,13 @@ impl KeyValueFileWriter {
262244
},
263245
)?,
264246
);
265-
// Value kind column.
266-
match self.config.value_kind_col_index {
247+
// Value kind column — resolve from batch schema.
248+
let vk_idx = combined
249+
.schema()
250+
.fields()
251+
.iter()
252+
.position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME);
253+
match vk_idx {
267254
Some(vk_idx) => {
268255
physical_columns.push(
269256
arrow_select::take::take(
@@ -282,8 +269,11 @@ impl KeyValueFileWriter {
282269
physical_columns.push(Arc::new(Int8Array::from(vec![0i8; chunk_len])));
283270
}
284271
}
285-
// All user columns.
272+
// All user columns (skip _VALUE_KIND if present — already handled above).
286273
for idx in 0..combined.num_columns() {
274+
if Some(idx) == vk_idx {
275+
continue;
276+
}
287277
physical_columns.push(
288278
arrow_select::take::take(combined.column(idx).as_ref(), &chunk_indices, None)
289279
.map_err(|e| crate::Error::DataInvalid {
@@ -459,3 +449,24 @@ impl KeyValueFileWriter {
459449
Ok(builder.build_serialized())
460450
}
461451
}
452+
453+
/// Build the physical schema: [_SEQUENCE_NUMBER, _VALUE_KIND, user_cols (excluding _VALUE_KIND)...]
454+
pub(crate) fn build_physical_schema(user_schema: &ArrowSchema) -> Arc<ArrowSchema> {
455+
let mut physical_fields: Vec<Arc<ArrowField>> = Vec::new();
456+
physical_fields.push(Arc::new(ArrowField::new(
457+
SEQUENCE_NUMBER_FIELD_NAME,
458+
ArrowDataType::Int64,
459+
false,
460+
)));
461+
physical_fields.push(Arc::new(ArrowField::new(
462+
VALUE_KIND_FIELD_NAME,
463+
ArrowDataType::Int8,
464+
false,
465+
)));
466+
for field in user_schema.fields().iter() {
467+
if field.name() != VALUE_KIND_FIELD_NAME {
468+
physical_fields.push(field.clone());
469+
}
470+
}
471+
Arc::new(ArrowSchema::new(physical_fields))
472+
}

crates/paimon/src/table/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ mod full_text_search_builder;
2929
pub(crate) mod global_index_scanner;
3030
mod kv_file_reader;
3131
mod kv_file_writer;
32+
mod postpone_file_writer;
3233
mod read_builder;
3334
pub(crate) mod rest_env;
3435
pub(crate) mod row_id_predicate;

0 commit comments

Comments
 (0)