Description
This Article will provide additional details on what cause the below Error message and make the connector task fail and how to bypass it
Failed
With the configured document ID strategy, all records are required to have keys, which must be either maps or structs.
For additional assistance, contact Confluent Support
Example Connector config:
{ "name": "MongoDbAtlasSinkConnector", "config": { "connector.class": "MongoDbAtlasSink", "name": "MongoDbAtlasSinkConnector", "input.data.format": "AVRO", "kafka.auth.mode": "KAFKA_API_KEY", "kafka.api.key": "<redacted>", "kafka.api.secret": "<redacted>", "topics": "Topicname", "connection.host": "abc.xxx.mongodb.net", "connection.user": "admin", "connection.password": "<redacted>", "database": "abc", "doc.id.strategy": "FullKeyStrategy", "doc.id.strategy.overwrite.existing": "true", "tasks.max": "1" } }
Applies To
MongoDB Atlas sink connector
Cause
- Sending a record without a key
- The key is not a map or a struct
Resolution
The ValueToKey SMT can be used to help with implementing the Document ID Strategy FullKeyStrategy.
A map or struct is required when using the FullKeyStrategy, which must have a key:
"transforms": "ValueToKey",
"transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "mykey"
Alternatively, if a key already exists but is not a map or struct, the HoistField SMT can be used to wrap the key in a struct or map, as below:
"transforms": "HoistField",
"transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Key",
"transforms.HoistField.field": "_id"