'How can we define nested json properties (including arrays) using Flink SQL API?

We have the following problem while using Flink SQL: we have configured Kafka Twitter connector to add tweets to Kafka and we want to read the tweets from Kafka in a table using Flink SQL.

How can we define nested json properties (including arrays) using Flink SQL API ?

We have tried the following, but it does not work (the values returned are empty):

CREATE TABLE kafka_tweets(
  payload ROW(`HashtagEntities` ARRAY[VARCHAR])
) WITH (
  'connector' = 'kafka',
  'topic' = 'twitter_status',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)

In the twitter response HashtagEntities is an array of objects.



Solution 1:[1]

CREATE TABLE `table` (
  `userid` BIGINT,
  `json_data` VARCHAR(2147483647),
  `request_id` AS JSON_VALUE(`json_data`, '$.request_id'),
  `items` ARRAY<ROW<`itemid` BIGINT, `shopid` BIGINT>>,
  `event_time` AS `TO_TIMESTAMP`(`FROM_UNIXTIME`(`timestamp`, 'yyyy-MM-dd HH:mm:ss')),
  `version` AS `TO_TIMESTAMP`(`FROM_UNIXTIME`(`timestamp`, 'yyyy-MM-dd')),
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '1' MINUTE
)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Peter Csala