Skip to content

[FLINK-39718][pipeline][paimon] Paimon pipeline sink fails with distributed source when target table does not exist#4406

Open
ThorneANN wants to merge 1 commit into
apache:masterfrom
ThorneANN:fix/paimon-hash-function-catalog-access
Open

[FLINK-39718][pipeline][paimon] Paimon pipeline sink fails with distributed source when target table does not exist#4406
ThorneANN wants to merge 1 commit into
apache:masterfrom
ThorneANN:fix/paimon-hash-function-catalog-access

Conversation

@ThorneANN
Copy link
Copy Markdown
Contributor

Problem

When using a distributed pipeline source (e.g., Kafka, any source where isParallelMetadataSource() = true) with a Paimon sink, the job fails with TableNotExistException if the
target table does not already exist in the catalog.

Root cause: In the distributed pipeline topology, DistributedPrePartitionOperator runs before SchemaOperator. When it receives a CreateTableEvent, it immediately calls
PaimonHashFunctionProvider.getHashFunction(), which constructs a PaimonHashFunction. The PaimonHashFunction constructor called catalog.getTable() to determine whether the table is
append-only — but the table hasn't been created yet because MetadataApplier hasn't run.

This does not affect MySQL-CDC pipelines because the regular topology routes events through SchemaOperator first, so the table always exists by the time
RegularPrePartitionOperator creates the hash function.

Fix

PaimonHashFunction queried the catalog for exactly one reason: to distinguish append-only tables (no primary key) from primary-key tables. This information is already available in
the CDC Schema object passed to getHashFunction() via schema.primaryKeys().

Changes:

  • PaimonHashFunction: Replaced the catalog.getTable() call with a check on schema.primaryKeys().isEmpty(). For primary-key tables, a minimal TableSchema is constructed directly
    from the CDC Schema (column types, partition keys, primary keys) to initialize RowAssignerChannelComputer. No catalog access at any stage.
  • PaimonHashFunctionProvider: Removed the Options parameter since it was only used to create a catalog.
  • PaimonDataSink: Updated the provider instantiation accordingly.
  • PaimonHashFunctionTest: Removed all catalog setup/teardown; tests now run fully offline.

@ThorneANN ThorneANN changed the title [FLINK-39718][paimon] Fix PaimonHashFunction failing when target table does not exist [FLINK-39718][pipeline][paimon] Paimon pipeline sink fails with distributed source when target table does not exist May 20, 2026
@taoran92
Copy link
Copy Markdown
Member

@ThorneANN nice job. btw, I think your commit message needs to be formatted, for example, [FLINK-xxx] [module] yyy

@ThorneANN
Copy link
Copy Markdown
Contributor Author

@lvyanquan Hi,can u help me review this pr.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants