Skip to content

[Bug] [CDC] Cannot instantiate user function. #4549

@J1031

Description

@J1031

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

I just tested dinky with the following docker images, and both are failed
dinkydocker/dinky-standalone-server:1.2.5-flink1.19
dinkydocker/dinky-standalone-server:1.2.5-flink1.20

I used with yarn session, and uploaded flink jars and cdc jars to the hdfs.The flink jars are from the docker containers /opt/flink. cdc jars are from maven, addresses are as follows

https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.19/26.0.0/flink-doris-connector-1.19-26.0.0.jar

https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.5.0/flink-sql-connector-mysql-cdc-3.5.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.5.0/flink-cdc-pipeline-connector-mysql-3.5.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/3.5.0/flink-cdc-pipeline-connector-doris-3.5.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-paimon/3.5.0/flink-cdc-pipeline-connector-paimon-3.5.0.jar

flink errors are as follows

2026-04-02 10:19:11
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:417)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:869)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:836)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:825)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:825)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:202)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:731)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:713)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.userFunction of type org.apache.flink.api.common.functions.Function in instance of org.apache.flink.streaming.api.operators.StreamFlatMap
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:494)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:478)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:473)
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:428)
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:401)
	... 16 more

What you expected to happen

flink job runs ok

How to reproduce

1.Run the docker conainer
docker run -p 8888:8888 -d --name dinky dinkydocker/dinky-standalone-server:1.2.5-flink1.19

  1. upload the /opt/flink/lib, /opt/flink/plugins and cdc jars to hdfs
  2. start the yarn-session
  3. config the dinky flink instance with the yarn-session's job manager address
  4. execute sql
EXECUTE CDCSOURCE demo_doris
WITH
(
    'connector' = 'mysql-cdc',
    'hostname' = '127.0.0.1',
    'port' = '3306',
    'username' = 'root',
    'password' = '12345',
    'checkpoint' = '10000',
    'scan.startup.mode' = 'initial',
    'parallelism' = '1',
    'table-name' = 'test\.student,test\.score',
    'server-time-zone' = 'Asia/Shanghai',

    'sink.connector' = 'doris',
    'sink.fenodes' = '127.0.0.1:8030',
    'sink.username' = 'root',
    'sink.password' = '123456',

    'sink.table.prefix' = 'ODS_',
    'sink.table.upper' = 'true',
    'sink.table.identifier' = '#{schemaName}.#{tableName}'
);

Anything else

No response

Version

dev

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

BugSomething isn't workingWaiting for replyWaiting for reply

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions