'Flink JDBC sink into multiple schemas
I am using Flink Jdbc Sink to push data into Postgres tables. The data has to be stored in different schemas having the same database connection.
DataStream<Book> stream = env.fromSource(...);
Each Book record in the stream has details about the schema it has to be stored in.
I tried to parameterize the database schema name in the PreparedStatement, but as expected, that is not allowed by SQL.
stream.addSink(JdbcSink.sink(
"insert into ?.books (id, title, authors, year) values (?, ?, ?, ?)",
(statement, book) -> {
statement.setString(1, book.schema)
statement.setLong(2, book.id);
statement.setString(3, book.title);
statement.setString(4, book.authors);
statement.setInt(5, book.year);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
.withDriverName("org.postgresql.Driver")
.withUsername("someUser")
.withPassword("somePassword")
.build()
));
Is there a workaround for this? Or do I have to explicitly add each schema in a separate sink?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
