'i'm getting this error when running the below pyflink code
this is the code for calculating average of each ch[x] from a kafka source using apache flink(pyflink) i think i have imported all of the necessary libraries
And I'm getting this error when running the code
from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *
def create_input():
return """
CREATE TABLE input(
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`t` TIMESTAMP_LTZ(3),
) WITH (
'connector' = 'kafka',
'topic' = 'energymeter.raw',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
)
"""
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
return x*12*2
@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
return x/500
def create_output():
return """
CREATE TABLE output (
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`ch1_mod` BIGINT,
`ch2_mod` BIGINT,
`ch3_mod` BIGINT,
`ch4_mod` BIGINT,
`ch5_mod` BIGINT,
`ch6_mod` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka'
'topic' = 'energymeter.processed',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'format' = 'json'
)
"""
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.execute_sql(average_power())
table_env.execute_sql(energy_consumption())
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, t, ch1, average_power(ch1), ch2, average_power(ch2), ch3, average_power(ch3), ch4, average_power(ch4), ch5, average_power(ch5), ch6, average_power(ch6) FROM input").wait()
Error is this i have added sql kafka connector flink-sql-connector-kafka_2.11-1.14.4.jar but nothing seems to work
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered ")" at line 11, column 9.
Was expecting one of:
"CONSTRAINT" ...
"PRIMARY" ...
"UNIQUE" ...
"WATERMARK" ...
<BRACKET_QUOTED_IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<HYPHENATED_IDENTIFIER> ...
<IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:472)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:235)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:195)
at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:77)
... 13 more ```
Solution 1:[1]
There are many problems with your program, e.g.
- Missing comma after
'connector' = 'kafka', extra comma after ``tTIMESTAMP_LTZ(3),and'format' = 'json', - Should use
create_temporary_functionto register Python UDFs instead ofexecute_sql - The fields order appearing in the
SELECTclause is not consistent with the sink tableoutputdefinition
I have made some modifications to it as following:
from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *
def create_input():
return """
CREATE TABLE input(
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka',
'topic' = 'energymeter.raw',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
return x*12*2
@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
return x/500
def create_output():
return """
CREATE TABLE output (
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`ch1_mod` BIGINT,
`ch2_mod` BIGINT,
`ch3_mod` BIGINT,
`ch4_mod` BIGINT,
`ch5_mod` BIGINT,
`ch6_mod` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka',
'topic' = 'energymeter.processed',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'format' = 'json'
)
"""
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.create_temporary_function("average_power", average_power)
table_env.create_temporary_function("energy_consumption", energy_consumption)
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, ch1, ch2, ch3, ch4, ch5, ch6, average_power(ch1), average_power(ch2), average_power(ch3), average_power(ch4), average_power(ch5), average_power(ch6), t FROM input").wait()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Dian Fu |
