'druid-kinesis-indexing-service giving unparseable characters

I am switching our ingestion from using tranquility to use druid-kinesis-indexing-service. However, when I connect to the data it is showing lines with correct json sandwiched between unparseable characters: eg:

�0{"message": {"ex_json_key":1}�0, �0{"message": {"ex_json_key":2}�0,

This means that the parser is not able to parse these lines correctly. I have tried fiddling with many of the input configurations in the supervisor spec but they do not seem to make a difference. This does not seem to be an issue at all using the same kinesis stream in tranquility. Would anyone know what the issue is here and/or the way to fix it?

Thanks!

Abbreviated version of our supervisor-spec is here:

> {
>  "type": "kinesis",
>  "spec": {
>      "dataSchema": {
>        "dataSource": "new_source_kinesis",
>        "metricsSpec": [
>        ],
>        "granularitySpec": {
>          "segmentGranularity": "hour",
>          "queryGranularity": "minute",
>          "rollup": true,
>          "type": "uniform"
>        },
>        "dimensionsSpec": {
>          "dimensions": [
>           "coln"
>          ]
>        },
>        "timestampSpec": {
>          "column": "timecol",
>          "format": "auto"
>        }
>      },
>      "ioConfig": {
>       "stream": "stream_name",
>       "inputFormat": {
>         "type": "json",
>         "flattenSpec": {
>           "useFieldDiscovery": true,
>           "fields": [
>             {
>               "type": "path",
>               "name": "coln",
>               "expr": "$.message.n"
>             }
>           ]
>         }
>        },
>        "endpoint": "kinesis.us-east-1.amazonaws.com",
>        "taskCount": 2
> 
>     },
>      "tuningConfig": {
>        "type": "kinesis",
>        "reportParseExceptions":true,
>        "logParseExceptions":true,
>        "intermediatePersistPeriod": "PT10M",
>        "maxRowsInMemory": 75000
>      }
>    }
> }


Solution 1:[1]

We were able to solve by following this portion of the documentation https://druid.apache.org/docs/latest/development/extensions-core/kinesis-ingestion.html#deaggregation

Our steps were:

  1. set "deaggregate": true in the ioConfig portion of the supervisor-spec
  2. adding amazon-kinesis-client 1.9.2 under the kinesis-indexing-service extensions folder on the middle-managers/coordinator

sudo wget https://repo1.maven.org/maven2/com/amazonaws/amazon-kinesis-client/1.9.2/amazon-kinesis-client-1.9.2.jar -P /druid-0.18.1/extensions/druid-kinesis-indexing-service/

  1. removing the existing amazon-kinesis-client 1.13 from druid-0.18.1/lib

sudo rm amazon-kinesis-client-1.13.3.jar

(without doing this step we were error Caused by: java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: com/amazonaws/services/kinesis/model/Record)

  1. restarting the middlemangers/coordinator

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 walker_4