'Filter condition not working in kafka http sink connector

I'm using confluent kafka http sink connector, I want to filter this condition - if item.productID=12 then allow this event or ignore. But my filter condition not working

"transforms.filterExample1.filter.condition": "$.[*][?(@.item.productID == '12')]",

What I'm doing wrong here?

Could you please help me to fix this issue?

{
    "connector.class": "io.confluent.connect.http.HttpSinkConnector",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "topics": "http-messages",
    "tasks.max": "1",
    "http.api.url": "http://localhost:8080/api/messages",
    "reporter.bootstrap.servers": "localhost:9092",
    "transforms.filter.type": "org.apache.kafka.connect.transforms.Filter",
    "transforms": "filterExample1",
    "transforms.filterExample1.type": "io.confluent.connect.transforms.Filter$Value",
    "transforms.filterExample1.filter.condition": "$.[*][?(@.item.productID == '12')]",
    "transforms.filterExample1.filter.type": "include",
    "transforms.filterExample1.missing.or.null.behavior": "fail",
    "reporter.error.topic.name": "error-responses",
    "reporter.result.topic.name": "success-responses",
    "reporter.error.topic.replication.factor": "1",
    "confluent.topic.replication.factor": "1",
    "value.converter.schemas.enable": "false",
    "name": "HttpSink",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "reporter.result.topic.replication.factor": "1"
}

My Event

[
   {
      "name":"apple",
      "salary":"3243",
      "item":{
         "productID":"12"
      }
   }
]


Solution 1:[1]

Since your data is in a JSON array, this transform wont work.

Tested with local data using the latest version of that transform, and saw this log from the Connect server

Caused by: org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [filtering record without schema], found: java.util.ArrayList
  at io.confluent.connect.transforms.util.Requirements.requireMap(Requirements.java:30) ~[?:?]
  at io.confluent.connect.transforms.Filter.shouldDrop(Filter.java:218) ~[?:?]
  at io.confluent.connect.transforms.Filter.apply(Filter.java:161) ~[?:?]

"Map Objects" implying JSON objects

Also, you have a setting transforms.filter.type that's not doing anything

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1