'How to store/aggregate correlated cdc events with flink?

I have a kafka queue where multiple cdc events are coming from a database. Suppose the following three tables implementing a student-course n:n association:

STUDENT
COURSE
STUDENT_COURSE

I can have the following "business" events:

  1. A new student enrolls to a course: in this case I would receive 3 events on my kafka queue, said events can come in any order but I'd like to emit a "business" event like this one: {type:"enroll", "student": {"name": "Jhon", "age": "..}, "course": {"name":"physics", "teacher":"Foo", ...}
  2. A student changes their course: in this case I would only receive 1 event on my kafka queue (on STUDENT_COURSE) and I'd launch a "business" event like this one: {"type": "change", "student": {"name": "Jhon", "age": "..}, "newcourse": {"name":"maths", "teacher":"Foo", ...}
  3. Updates on STUDENT information (say email, phone,...) or COURSE information (time, teacher,...) 1 event on either table

My issue is that I don't know how to store and correlate said CDC to make a business event together, in fact I'd need to do something like this:

  1. Receive the event and store it in an "uncertain" state, wait for a reasonable time, say 10 sec
  2. If an event on another table is received then I'm in case 1
  3. Otherwise I'm in 2/3

Is there a way to obtain this behavior in flink?



Solution 1:[1]

Looks like you could start with a streaming SQL join on the 3 dynamic tables derived from these CDC streams, which will produce an update stream along the lines of what you're looking for.

Some of the examples in https://github.com/ververica/flink-sql-cookbook should provide inspiration for getting started. https://github.com/ververica/flink-sql-CDC and https://github.com/ververica/flink-cdc-connectors are also good resources.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 David Anderson