'Get nested fields from Kafka message using Apache Flink SQL
I'm trying to create a source table using Apache Flink 1.11 where I can get access to nested properties in a JSON message. I can pluck values off root properties but I'm unsure how to access nested objects.
The documentation suggests that it should be a MAP type but when I set that, I get the following error
: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP
Here is my SQL
        CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )
And my JSON looks something like this:
{
  "id": "message-1",
  "title": "Some Title",
  "properties": {
    "foo": "bar"
  }
}
Solution 1:[1]
You can use ROW to extract nested fields in your JSON messages. Your DDL statement would look something like:
CREATE TABLE input(
             id VARCHAR,
             title VARCHAR,
             properties ROW(`foo` VARCHAR)
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        );
Solution 2:[2]
You may also try
CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP<STRING, STRING>
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )
The only difference is: MAP<STRING, STRING> vs MAP
Solution 3:[3]
[2022 Update]
In release Apache Flink 1.13 there is no system built-in JSON functions. They are introduced in 1.14 version. Check this
If you are using version <1.14, then see below solution.
How can I create table with nested JSON input ?
JSON input example:
{
    "id": "message-1",
    "title": "Some Title",
    "properties": {
    "foo": "bar"
    "nested_foo":{
        "prop1" : "value1",
        "prop2" : "value2"
    }
}
create statement
CREATE TABLE input(
                id VARCHAR,
                title VARCHAR,
                properties ROW(`foo` VARCHAR, `nested_foo` ROW(`prop1` VARCHAR, `prop2` VARCHAR))
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        );
How can I select nested columns?
SELECT properties.foo, properties.nested_foo.prop1 FROM input;
Note that if you output the results with
SELECT properties FROM input
You see the results in row format. The content of the column properties will be
+I[bar, +I[prop1,prop2]]
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source | 
|---|---|
| Solution 1 | morsapaes | 
| Solution 2 | 0xbe1 | 
| Solution 3 | 
