'How to find any factory for identifier 'elasticsearch-7' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath

I am trying to create a pyflink application with table API and elasticsearch as sink.

 from pyflink.table import TableEnvironment, EnvironmentSettings

 def log_processing():
    env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
    t_env = TableEnvironment.create(env_settings)
    t_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///path_to/flink-sql-connector-kafka_2.12-1.13.1.jar;file:///path_to/flink-sql-connector-elasticsearch7_2.11-1.13.1")

    sink_ddl = """
           CREATE TABLE myUserTable (
  user_id STRING,
  user_name STRING,
  uv BIGINT,
  pv BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://localhost:9200',
  'index' = 'users'
)
            """
t_env.execute_sql(sink_ddl)
print(sink_ddl)
sink_table = t_env.sql_query("SELECT * FROM myUserTable")
    


if __name__ == '__main__':
    log_processing()

When I am trying to run the above code, showing the below error:

`Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'elasticsearch-7' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

  • blackhole
  • datagen
  • filesystem
  • kafka

print upsert-kafka

at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319) at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)\

How to get rid of this problem.



Solution 1:[1]

Could you double check if the path file:///path_to/flink-sql-connector-elasticsearch7_2.11-1.13.1 really exists. It seems that it's missing the .jar suffix.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Dian Fu