What happened?
The debeziumRecordInstant() method in KafkaConnectUtils throws a NullPointerException when processing Debezium DELETE events because it doesn't handle the case where valueSchema is null.
Steps to Reproduce
- Set up a Debezium pipeline using DebeziumIO.read()
- Perform a DELETE operation on a record in the source database
- The pipeline crashes with a NullPointerException
Expected Behavior
The method should handle DELETE operations gracefully by extracting the timestamp from the sourceOffset metadata, specifically from the ts_usec field.
Actual Behavior
NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Schema.type()" because the return value
of "org.apache.kafka.connect.source.SourceRecord.valueSchema()" is null
at org.apache.beam.io.debezium.KafkaConnectUtils.debeziumRecordInstant(KafkaConnectUtils.java:83)
Root Cause
For DELETE operations, Debezium sets and to null, but the timestamp information is available in sourceOffset.ts_usec: value``valueSchema
SourceRecord{
sourcePartition={server=beam-debezium-connector},
sourceOffset={lsn_proc=42600784, messageType=DELETE, lsn_commit=42600728, lsn=42600784, txId=1073, ts_usec=1772459804615258}
ConnectRecord{..., value=null, valueSchema=null, ...}
}
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
What happened?
The
debeziumRecordInstant()method in KafkaConnectUtils throws a NullPointerException when processing Debezium DELETE events because it doesn't handle the case where valueSchema is null.Steps to Reproduce
Expected Behavior
The method should handle DELETE operations gracefully by extracting the timestamp from the sourceOffset metadata, specifically from the ts_usec field.
Actual Behavior
Root Cause
For DELETE operations, Debezium sets and to null, but the timestamp information is available in
sourceOffset.ts_usec:value``valueSchemaIssue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components