'Apache flink connect to postgresql
I'm trying to connect to a postgresql with pyflink on windows and I'm using the following code:
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql("""
CREATE TABLE test_nifi (
codecountry VARCHAR(50),
name VARCHAR(50),
PRIMARY KEY (codecountry) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/TestDS',
'table-name' = 'public.test_nifi',
'username' = 'postgres',
'password' = 'postgres'
)
""")
result = table_env.from_path("test_nifi").select("codecountry, name")
print(result.to_pandas())
and I'm getting the following error:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Any idea why is this happening?
Solution 1:[1]
add following line:
table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///C:/Users/Admin/Desktop/Flink/flink-connector-jdbc_2.12-1.14.3.jar;file:///C:/Users/Admin/Desktop/Flink/postgresql-42.3.1.jar")
Solution 2:[2]
Since Flink is a Java/Scala-based project, for both connectors and formats, implementations are available as jars
postgresql in pyflink relies on Java's flink-connector-jdbc implementation and you need to add this jar in stream_execution_environment
stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Dương Thái |
| Solution 2 | ChangLi |
