'Apache beam io.ReadFromPubSub(topic=input_topic) doesn't work with SparkRunner

I tried to run the below simple beam pipeline on a spark cluster (GCP Dataproc):

import argparse

from apache_beam import (
    CombinePerKey,
    DoFn,
    FlatMap,
    GroupByKey,
    ParDo,
    Pipeline,
    PTransform,
    WindowInto,
    WithKeys,
    io,
)
from apache_beam.options.pipeline_options import PipelineOptions

class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, key_value, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        shard_id, batch = key_value
        filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body, publish_time in batch:
                print(">>>>>>>>>>", message_body, publish_time, "===========")
                f.write(f"{message_body},{publish_time}\n".encode("utf-8"))


def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(pipeline_args, streaming=True, save_main_session=True)

    with Pipeline(options=pipeline_options) as p:
        # Read from PubSub into a PCollection.
        lines = p | io.ReadStringsFromPubSub(topic=input_topic)

        # lines | "Write to GCS" >> ParDo(WriteToGCS(output_path))


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from." '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="Path of the output GCS file including the prefix.",
    )
    parser.add_argument(
        "--num_shards",
        type=int,
        default=5,
        help="Number of shards to use when writing windowed elements to GCS.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        known_args.num_shards,
        pipeline_args
    )

On my local with DirectRunner it is working fine, i can see the messages are consumed from pub/sub and get writing to GCS without any issue.

But when i tried to follow this instruction: Running on Dataproc cluster (YARN backed), after submitted the job to the remote spark cluster, i kept getting the below error:

[2022-05-07 11:56:45.801]Container exited with a non-zero exit code 13. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
orImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
Caused by: java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "42_ReadStringsFromPubSub/ReadFromPubSub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
  at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
  at org.apache.beam.runners.core.construction.graph.QueryablePipeline.buildNetwork(QueryablePipeline.java:234)
  at org.apache.beam.runners.core.construction.graph.QueryablePipeline.<init>(QueryablePipeline.java:127)
  at org.apache.beam.runners.core.construction.graph.QueryablePipeline.forPrimitivesIn(QueryablePipeline.java:90)
  at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.<init>(GreedyPipelineFuser.java:70)
  at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.fuse(GreedyPipelineFuser.java:93)
  at org.apache.beam.runners.spark.SparkPipelineRunner.run(SparkPipelineRunner.java:114)
  at org.apache.beam.runners.spark.SparkPipelineRunner.main(SparkPipelineRunner.java:263)
  ... 5 more

P.S.:

  1. i followed the same instruction: Running on Dataproc cluster (YARN backed) to run the apache_beam.examples.wordcount example and it is working ok.
  2. apache-beam = {extras = ["gcp"], version = "~2.38.0"}
  3. Seems someone had faced exactly same error with FlinkRunner also: Error while running beam streaming pipeline (Python) with pub/sub io in embedded Flinkrunner (apache_beam [GCP])

What could be the issue or what am i missing?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source