'Filter condition not working in kafka http sink connector
I'm using confluent kafka http sink connector, I want to filter this condition - if item.productID=12 then allow this event or ignore. But my filter condition not working
"transforms.filterExample1.filter.condition": "$.[*][?(@.item.productID == '12')]",
What I'm doing wrong here?
Could you please help me to fix this issue?
{
"connector.class": "io.confluent.connect.http.HttpSinkConnector",
"confluent.topic.bootstrap.servers": "localhost:9092",
"topics": "http-messages",
"tasks.max": "1",
"http.api.url": "http://localhost:8080/api/messages",
"reporter.bootstrap.servers": "localhost:9092",
"transforms.filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms": "filterExample1",
"transforms.filterExample1.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.filterExample1.filter.condition": "$.[*][?(@.item.productID == '12')]",
"transforms.filterExample1.filter.type": "include",
"transforms.filterExample1.missing.or.null.behavior": "fail",
"reporter.error.topic.name": "error-responses",
"reporter.result.topic.name": "success-responses",
"reporter.error.topic.replication.factor": "1",
"confluent.topic.replication.factor": "1",
"value.converter.schemas.enable": "false",
"name": "HttpSink",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"reporter.result.topic.replication.factor": "1"
}
My Event
[
{
"name":"apple",
"salary":"3243",
"item":{
"productID":"12"
}
}
]
Solution 1:[1]
Since your data is in a JSON array, this transform wont work.
Tested with local data using the latest version of that transform, and saw this log from the Connect server
Caused by: org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [filtering record without schema], found: java.util.ArrayList
at io.confluent.connect.transforms.util.Requirements.requireMap(Requirements.java:30) ~[?:?]
at io.confluent.connect.transforms.Filter.shouldDrop(Filter.java:218) ~[?:?]
at io.confluent.connect.transforms.Filter.apply(Filter.java:161) ~[?:?]
"Map Objects" implying JSON objects
Also, you have a setting transforms.filter.type that's not doing anything
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
