'How can we define nested json properties (including arrays) using Flink SQL API?
We have the following problem while using Flink SQL: we have configured Kafka Twitter connector to add tweets to Kafka and we want to read the tweets from Kafka in a table using Flink SQL.
How can we define nested json properties (including arrays) using Flink SQL API ?
We have tried the following, but it does not work (the values returned are empty):
CREATE TABLE kafka_tweets(
payload ROW(`HashtagEntities` ARRAY[VARCHAR])
) WITH (
'connector' = 'kafka',
'topic' = 'twitter_status',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
In the twitter response HashtagEntities is an array of objects.
Solution 1:[1]
CREATE TABLE `table` (
`userid` BIGINT,
`json_data` VARCHAR(2147483647),
`request_id` AS JSON_VALUE(`json_data`, '$.request_id'),
`items` ARRAY<ROW<`itemid` BIGINT, `shopid` BIGINT>>,
`event_time` AS `TO_TIMESTAMP`(`FROM_UNIXTIME`(`timestamp`, 'yyyy-MM-dd HH:mm:ss')),
`version` AS `TO_TIMESTAMP`(`FROM_UNIXTIME`(`timestamp`, 'yyyy-MM-dd')),
WATERMARK FOR `event_time` AS `event_time` - INTERVAL '1' MINUTE
)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Peter Csala |
