'How to find any factory for identifier 'elasticsearch-7' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath
I am trying to create a pyflink application with table API and elasticsearch as sink.
from pyflink.table import TableEnvironment, EnvironmentSettings
def log_processing():
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///path_to/flink-sql-connector-kafka_2.12-1.13.1.jar;file:///path_to/flink-sql-connector-elasticsearch7_2.11-1.13.1")
sink_ddl = """
CREATE TABLE myUserTable (
user_id STRING,
user_name STRING,
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'users'
)
"""
t_env.execute_sql(sink_ddl)
print(sink_ddl)
sink_table = t_env.sql_query("SELECT * FROM myUserTable")
if __name__ == '__main__':
log_processing()
When I am trying to run the above code, showing the below error:
`Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'elasticsearch-7' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
- blackhole
- datagen
- filesystem
- kafka
print upsert-kafka
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)\
How to get rid of this problem.
Solution 1:[1]
Could you double check if the path file:///path_to/flink-sql-connector-elasticsearch7_2.11-1.13.1
really exists. It seems that it's missing the .jar
suffix.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Dian Fu |