'Apache Beam Slowly Changing Lookup Cache from JdbcIO
I have an Apache Beam pipeline that processes unbounded data and the results are written into MySQL. In this process, there is a need to look up the username from the user identifier. I'm using side input user id and username map to the pipeline.
Since we keep adding users, the side input needs to be updated periodically. I've gone through the side input patterns "Slowly updating global window side inputs" and "Slowly updating side input using windowing".
I lean towards the first due to new users added not being that frequent.
Reading users from the database using JdbcIO.
final PCollection userCollection =
pipeline.apply("read-users-info", jdbcMgr.readUserInfo(userDsFn));
Reading data from MySQL
public PTransform<PBegin, PCollection<KV<String, String>>> readUserInfo(
SerializableFunction<Void, DataSource> dataSourceProviderFn) {
LOG.info("reading users");
return JdbcIO.<KV<String, String>>read()
.withDataSourceProviderFn(dataSourceProviderFn)
.withQuery("select id, concat(first_name, ' ', last_name) from users")
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.withRowMapper(
(JdbcIO.RowMapper<KV<String, String>>) rs -> KV.of(rs.getString(1), rs.getString(2)));
}
}
Updating side input using global window.
final PCollectionView<Map<String, String>> userMap =
pipeline
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(30)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(30))))
.apply(Sum.longsGlobally().withoutDefaults())
.apply(
ParDo.of(
new DoFn<Long, Map<String, String>>() {
@ProcessElement
public void process(
@Element Long input,
@Timestamp Instant timestamp,
OutputReceiver<PCollection<KV<String, String>>> o) {
o.output(userCollection);
}
}))
.apply(
Window.<Map<String, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(View.asSingleton());
I'm sure there is an issue with o.output(userCollection);, can you please help me out here.
Thanks, Suresh
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
