Description
This article discusses how to troubleshoot the below user actionable error with a Fully managed JDBC Sink (OracleDatabaseSink/MySqlSink) Connector on Confluent Cloud:
“The connector is configured with 'delete.enabled=true' and 'pk.mode=record_key', which requires records with a non-null key and non-null Struct or primitive key schema. However, the connector encountered records with a null key and null key schema. To use the connector with the same configuration, please consider using SMTs to filter out those records, or use SMTs like ValueToKey to add non-null key to records.“
Applies To
JDBC Sink (OracleDatabaseSink/MySqlSink) Connector
Confluent Cloud
Cause
This issue usually occurs when the connector encountered records with a null key and null key schema in the presence of the following configuration:
"pk.mode": "record_key",
"pk.fields": "xxx"
which means that the fields from the record key are used, which may be a Primitive or a Struct.
However, the message key being used seems to be having a null value which results into this issue.
This states that the connector requires records with a non-null Struct value and non-null Struct schema but it looks like there might be some records with a null value.
Resolution
This issue can be by making use of SMTs to filter out those records.
- In this case TombstoneHandler SMT can be used, which will allow ignore these NULL/Tombstone messages. For example:
"transforms": "tombstoneHandlerExample",
"transforms.tombstoneHandlerExample.type": "io.confluent.connect.transforms.TombstoneHandler"(or)
"transforms": "Filter",
"transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate": "isNull",
"predicates": "isNull",
"predicates.isNull.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"This filters the records and allows the connector to continue running as well, in addition to the TombstoneHandler.
- Other option is to use SMTs like ValueToKey to add non-null key to records.