'Apache flink connect to postgresql

I'm trying to connect to a postgresql with pyflink on windows and I'm using the following code:

from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

table_env.execute_sql("""
    CREATE TABLE test_nifi (
        codecountry VARCHAR(50), 
        name VARCHAR(50),
        PRIMARY KEY (codecountry) NOT ENFORCED
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://localhost:5432/TestDS',
        'table-name' = 'public.test_nifi',
        'username' = 'postgres',
        'password' = 'postgres'
    )
""")

result = table_env.from_path("test_nifi").select("codecountry, name")
print(result.to_pandas())

and I'm getting the following error:

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Any idea why is this happening?



Solution 1:[1]

add following line:

table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///C:/Users/Admin/Desktop/Flink/flink-connector-jdbc_2.12-1.14.3.jar;file:///C:/Users/Admin/Desktop/Flink/postgresql-42.3.1.jar")

Solution 2:[2]

Since Flink is a Java/Scala-based project, for both connectors and formats, implementations are available as jars

postgresql in pyflink relies on Java's flink-connector-jdbc implementation and you need to add this jar in stream_execution_environment

stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Dương Thái
Solution 2 ChangLi