'Why does Beam program fail on Direct Runner but succeed on Dataflow Job?

var stream = pipeline // Extract - Transform - Load, Lather Rinse Repeat
    .apply("Extract from Source",   PubsubIO.readStrings().fromSubscription(inputSubscription).withTimestampAttribute("date"))
    .apply("Transform to TableRow", ParDo.of(new TransformToTableRow()))
    .apply("Define Window",         Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))).withAllowedLateness(Duration.standardMinutes(10)).accumulatingFiredPanes())
    .apply("Transform to JSON",     MapElements.into(TypeDescriptors.strings()).via(GenericJson::toString));

stream.apply("Load to PubSub",      PubsubIO.writeStrings().to(sinkTopic));

stream.apply("Load to GCS",     TextIO.write().to(sinkUri).withWindowedWrites().withNumShards(1));

(1) When I run this code on Google Dataflow via a Flex Template, it works fine. But when I run this locally via the direct runner, I get

Exception in thread "main" java.lang.IllegalArgumentException: PubSub message is missing a value for timestamp attribute date

I can verify the source is

INFO: pubsub message = {"date":"2022-03-28T19:24:52.836Z","sequence":251,"whoami":"eric.kolotyluk"}

All the PubSub messages are the same, and are generated dynamically from the same data producer with the same data.

(2) Also, under Direct Runner there is no output into Google Cloud Storage, while it works fine under Dataflow. On the other hand, the pubsub output works as expected under Direct Runner? 🤔

What is different between the direct runner and a Beam Job in GCP that would cause these disappointments?

From my build.gradle file

runtimeOnly 'org.apache.beam:beam-runners-direct-java:2.37.0'
runtimeOnly 'org.apache.beam:beam-runners-google-cloud-dataflow-java:2.37.0'
runtimeOnly 'org.apache.beam:beam-runners-portability-java:2.37.0'


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source