Description
The following error might be observed on the Confluent Cloud UI for MySQL Sink Connectors:
Connector failed because of it encountered some bad records. It expects record values with a non-null Struct and a non null Struct schema but some records were found that either had a null schema, a null value or a value schema that was not of type STRUCT. Please ensure that the connector is configured with topics that contain records.
Applies To
Confluent Cloud
MySQL Sink Connector
Cause
1. This happens because of the presence of the following configuration properties:
"pk.mode": "record_key",
"pk.fields": "xxx"
which means that the fields from the record key are used, which may be a Primitive or a Struct.
However, the message key being used seems to have a null value, which results in this issue.
2. This states that the connector requires records with a non-null Struct value and non-null Struct schema, but it looks like there might be some records with a null value.
Resolution
- You can use the default for pk.mode (none) to overcome this issue, but you need to be aware of the limitations of using this value here: https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/overview.html#key-handling
-
If the record keys have tombstone records (values that are NULL), you can either set "delete.enabled" configuration to true, which treats null record values as deletes, or use the TombstoneHandler SMT or Filter (Apache) to filter out Tombstone values.
For example:
"transforms": "tombstoneHandlerExample",
"transforms.tombstoneHandlerExample.type": "io.confluent.connect.transforms.TombstoneHandler"
(or)
"transforms": "Filter",
"transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate": "isNull",
"predicates": "isNull",
"predicates.isNull.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"