diff --git a/src/paimon/core/operation/file_store_commit.cpp b/src/paimon/core/operation/file_store_commit.cpp index 9d78b74b..a7675c04 100644 --- a/src/paimon/core/operation/file_store_commit.cpp +++ b/src/paimon/core/operation/file_store_commit.cpp @@ -31,6 +31,7 @@ #include "paimon/core/operation/file_store_commit_impl.h" #include "paimon/core/schema/schema_manager.h" #include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/bucket_mode.h" #include "paimon/core/utils/field_mapping.h" #include "paimon/core/utils/file_store_path_factory.h" #include "paimon/core/utils/snapshot_manager.h" @@ -68,7 +69,12 @@ Result> FileStoreCommit::Create( const auto& schema = table_schema.value(); if (!schema->PrimaryKeys().empty() && ctx->GetOptions().find("enable-pk-commit-in-inte-test") == ctx->GetOptions().end()) { - return Status::NotImplemented("not support pk table commit yet"); + // Postpone bucket mode (bucket=-2) writes all data files to the bucket-postpone/ directory. + // A compaction job will later redistribute files into real buckets. The commit logic + // (manifest and snapshot generation) is the same as append tables, so we allow it. + if (schema->NumBuckets() != BucketModeDefine::POSTPONE_BUCKET) { + return Status::NotImplemented("not support pk table commit yet"); + } } auto opts = schema->Options(); for (const auto& [key, value] : ctx->GetOptions()) { diff --git a/src/paimon/core/operation/file_store_commit_impl_test.cpp b/src/paimon/core/operation/file_store_commit_impl_test.cpp index a3154bfc..b8dda6a9 100644 --- a/src/paimon/core/operation/file_store_commit_impl_test.cpp +++ b/src/paimon/core/operation/file_store_commit_impl_test.cpp @@ -1688,4 +1688,62 @@ TEST_F(FileStoreCommitImplTest, TestObjectStoreAllowedWithRESTCatalogCommit) { ASSERT_FALSE(json.empty()); } +// Verify that FileStoreCommit::Create succeeds for PK tables with postpone bucket mode (bucket=-2) +// without requiring the enable-pk-commit-in-inte-test workaround flag. +TEST_F(FileStoreCommitImplTest, TestPostponeBucketPKTableCommitAllowed) { + auto pk_dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(pk_dir); + std::string pk_root = pk_dir->Str(); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(pk_root, {})); + ASSERT_OK(catalog->CreateDatabase("db", {}, false)); + + arrow::Schema pk_schema( + {arrow::field("pk", arrow::int32()), arrow::field("val", arrow::utf8())}); + ::ArrowSchema arrow_schema; + ASSERT_TRUE(arrow::ExportSchema(pk_schema, &arrow_schema).ok()); + std::map table_options = {{Options::BUCKET, "-2"}}; + ASSERT_OK(catalog->CreateTable(Identifier("db", "pk_tbl"), &arrow_schema, + /*partition_keys=*/{}, /*primary_keys=*/{"pk"}, table_options, + /*ignore_if_exists=*/false)); + + std::string pk_table_path = PathUtil::JoinPath(pk_root, "db.db/pk_tbl"); + + // Create FileStoreCommit WITHOUT the workaround flag — should succeed for postpone bucket + CommitContextBuilder builder(pk_table_path, "test_user"); + builder.AddOption(Options::FILE_SYSTEM, "local").UseRESTCatalogCommit(true); + ASSERT_OK_AND_ASSIGN(auto commit_context, builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto committer, FileStoreCommit::Create(std::move(commit_context))); + ASSERT_TRUE(committer != nullptr); +} + +// Verify that FileStoreCommit::Create still rejects PK tables with fixed bucket (bucket > 0) +// when the workaround flag is not set. +TEST_F(FileStoreCommitImplTest, TestFixedBucketPKTableCommitRejected) { + auto pk_dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(pk_dir); + std::string pk_root = pk_dir->Str(); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(pk_root, {})); + ASSERT_OK(catalog->CreateDatabase("db", {}, false)); + + arrow::Schema pk_schema( + {arrow::field("pk", arrow::int32()), arrow::field("val", arrow::utf8())}); + ::ArrowSchema arrow_schema; + ASSERT_TRUE(arrow::ExportSchema(pk_schema, &arrow_schema).ok()); + std::map table_options = {{Options::BUCKET, "4"}}; + ASSERT_OK(catalog->CreateTable(Identifier("db", "pk_tbl_fixed"), &arrow_schema, + /*partition_keys=*/{}, /*primary_keys=*/{"pk"}, table_options, + /*ignore_if_exists=*/false)); + + std::string pk_table_path = PathUtil::JoinPath(pk_root, "db.db/pk_tbl_fixed"); + + CommitContextBuilder builder(pk_table_path, "test_user"); + builder.AddOption(Options::FILE_SYSTEM, "local").UseRESTCatalogCommit(true); + ASSERT_OK_AND_ASSIGN(auto commit_context, builder.Finish()); + auto result = FileStoreCommit::Create(std::move(commit_context)); + ASSERT_FALSE(result.ok()); + ASSERT_TRUE(result.status().IsNotImplemented()); + ASSERT_TRUE(result.status().ToString().find("not support pk table commit") != + std::string::npos); +} + } // namespace paimon::test