'Why can I not run a Kafka connector?

Background

Firstly - a bit of background - I am trying to learn a bit more about Kafka and Kafka connect. In that vein I'm following along to an early release book 'Kafka Connect' by Mickael Maison and Kate Stanley.

Run Connectors

Very early on (Chapter 2 - components in a connect data pipeline) they give an example of 'How do you run connectors'. Note that the authors are not using Confluent. Here in the early stages, we are advised to create a file named sink-config.json and then create a topic called topic-to-export with the following line of code:

bin/kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --replication-factor 1 --partitions 1 --topic topic-to-export

We are then instructed to "use the Connect REST API to start the connector with the configuration you created"

$ curl -X PUT -H "Content-Type: application/json" \       http://localhost:8083/connectors/file-sink/config --data "@sink-config.json"

The Error

However, when I run this command it brings up the following error:

{"error_code":500,"message":"Cannot deserialize value of type `java.lang.String` from Object value (token `JsonToken.START_OBJECT`)\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 36] (through reference chain: java.util.LinkedHashMap[\"config\"])"}

Trying to fix the error

Keeping in mind that I'm still trying to learn Kafka and Kafka Connect I've done a pretty simple search which has brought me to a post on StackOverflow which seemed to suggest maybe this should have been a POST not a PUT. However, changing this to:

curl -d @sink-config.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors/file-sink/config

simply brings up another error:

{"error_code":405,"message":"HTTP 405 Method Not Allowed"}

I'm really not sure where to go from here as this 'seems' to be the way that you should be able to get a connector to run. For example, this intro to connectors by Baeldung also seems to specify this way of doing things.

Does anyone have any ideas what is going on? I'm not sure where to start...



Solution 1:[1]

First, thanks for taking a look at the early access version of our book.

You found a mistake in this example!

To start a connector, the recommended way is to use the PUT /connectors/file-sink/config endpoint, however the example JSON we provided is not correct.

The JSON file should be something like:

{
    "name": "file-sink",
    "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "tasks.max": 1,
    "topics": "topic-to-export",
    "file": "/tmp/sink.out",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter" 
}

The mistake comes because there's another endpoint that can be used to start connectors, POST /connectors, and the JSON we provided is for that endpoint.

We recommend you use PUT /connectors/file-sink/config as the same endpoint can also be used to reconfigure connectors. In addition, the same JSON file can also be used with the PUT /connector-plugins/{connector-type}/config/validate endpoint.

Thanks again for spotting the mistake and reporting it, we'll fix the example in the coming weeks. We'll also reply to your emails about the other questions shortly.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Mickael Maison