October 25, 2024
Chicago 12, Melborne City, USA
java

Confluent Connect not passing Headers into Converter#fromConnectData


tl;dr; how can I implement a Kafka Converter which uses headers?

I have made a custom Kafka Connect Converter, and as I understand it, the toConnectData is used when deserializing messages.

There are 2 functions in the interface, the second one includes Headers and mentions it is the function to be called by the Connect system, while the first exists for backwards compatibility.

The two interfaces:

    byte[] fromConnectData(String topic, Schema schema, Object value);
    byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value)

Interface ref:
https://github.com/apache/kafka/blob/1eb7644349cb07139d6a3c1ad1986979647cac99/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L52-L68

In reality I am finding the first to be used instead — and for my use-case, I need the headers to perform the function.

Example Converter implementation

package com.example;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;

public class ExampleConverter implements Converter {

    ...

    @Override
    public SchemaAndValue toConnectData(String topic, byte[] value) {
        throw new RuntimeException("headers not supplied, these are required in order to decrypt");
    }

    @Override
    public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
        return new SchemaAndValue(Schema.BYTES_SCHEMA, null);
    }

}

I run this converter using Confluent’s Connect container image confluentinc/cp-enterprise-replicator:7.7.0

I get the following error — which clearly indicates it is calling the older (deprecated?) function, without headers:

java.lang.RuntimeException: headers not supplied, these are required in order to decrypt
    at com.example.ExampleConverter.toConnectData(ExampleConverter.java:50)
    at io.confluent.connect.replicator.ReplicatorSourceTask.convertKeyValue(ReplicatorSourceTask.java:637)
    at io.confluent.connect.replicator.ReplicatorSourceTask.poll(ReplicatorSourceTask.java:536)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:488)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:360)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:80)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

Seeking advice — am I doing something wrong?



You need to sign in to view this answers

Leave feedback about this

  • Quality
  • Price
  • Service

PROS

+
Add Field

CONS

+
Add Field
Choose Image
Choose Video