'Message batching Apache BEAM pipeline triggering immediately instead of after fixed window

I have an Apache BEAM pipeline that I would like to read from a Google PubSub topic, apply deduplication, and emit the messages to another Pubsub topic on (at the end of) 15-min fixed windows. Managed to get it working with the deduplication, however, the issue is that the messages seem to get sent to the topic immediately instead of waiting for the end of the 15-mins.

Even after applying Window.triggering(AfterWatermark.pastEndOfWindow()) it didn't seem to be working (i.e. messages are emitted immediately). (Ref: https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/transforms/windowing/Window.html).

Seeking help on what is wrong with my code? Full code below:

Also, would it be correct to assume that the Deduplication takes the fixed window as its bound, or I would need to separately set the time domain for deduplication (Ref: https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/transforms/Deduplicate.html seems to say that it would default to the time domain which would be the fixed window defined)

pipeline
  .apply("Read from source topic", PubsubIO.<String>readStrings().fromTopic(SOURCE_TOPIC))
  .apply("De-duplication ", 
      Deduplicate.<String>values()
  )
  .apply(windowDurationMins + " min Window", 
      Window.<String>into(FixedWindows.of(Duration.standardMinutes(windowDurationMins)))
         .triggering(
              AfterWatermark.pastEndOfWindow()
          )
          .withAllowedLateness(Duration.ZERO)
          .discardingFiredPanes()
  )
  .apply("Format to JSON", ParDo.of(new DataToJson()))
  .apply("Emit to sink topic",
      PubsubIO.writeStrings().to(SINK_TOPIC)
  );

[Update] Tried the following but nothing seemed to work

  • Removed deduplication
  • Changed to .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
  • Read from topic with timestampAttribute: PubsubIO.<String>readStrings().fromTopic(SOURCE_TOPIC).withTimestampAttribute("publishTime"))

Windowing seems to require some kind of timestamp associated with each data element. However, .withTimestampAttribute("publishTime") from PubsubIO didn't seem to work. Is there something else I could try to add a timestamp to my data for windowing?

[Update 2] Tried manually attaching a timestamp based on this ref (https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements) as below - but it STILL doesn't work

.apply("Add timestamp", ParDo.of(new ApplyTimestamp()))

public class ApplyTimestamp extends DoFn<String, String> {
    @ProcessElement
    public void addTimestamp(ProcessContext context) {
        try {
            String data = context.element();
            Instant timestamp = new Instant();
            context.outputWithTimestamp(data, timestamp);
        } catch(Exception e) {
            LOG.error("Error timestamping", e);
        }
    }
}

At this point I feel like I'm about to go crazy lol...



Solution 1:[1]

A GBK transform is needed in-between the immediate windowing after reading from source and the deduplication logic. Windows are applied in the next GroupByKeys, including one within composite transforms. GBK will group elements by the combination of keys and windows. Also, note that the default trigger is already AfterWatermark with 0 allowed lateness

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1