'Apache beam io.ReadFromPubSub(topic=input_topic) doesn't work with SparkRunner
I tried to run the below simple beam pipeline on a spark cluster (GCP Dataproc):
import argparse
from apache_beam import (
CombinePerKey,
DoFn,
FlatMap,
GroupByKey,
ParDo,
Pipeline,
PTransform,
WindowInto,
WithKeys,
io,
)
from apache_beam.options.pipeline_options import PipelineOptions
class WriteToGCS(DoFn):
def __init__(self, output_path):
self.output_path = output_path
def process(self, key_value, window=DoFn.WindowParam):
"""Write messages in a batch to Google Cloud Storage."""
ts_format = "%H:%M"
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
shard_id, batch = key_value
filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])
with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
for message_body, publish_time in batch:
print(">>>>>>>>>>", message_body, publish_time, "===========")
f.write(f"{message_body},{publish_time}\n".encode("utf-8"))
def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
# Set `save_main_session` to True so DoFns can access globally imported modules.
pipeline_options = PipelineOptions(pipeline_args, streaming=True, save_main_session=True)
with Pipeline(options=pipeline_options) as p:
# Read from PubSub into a PCollection.
lines = p | io.ReadStringsFromPubSub(topic=input_topic)
# lines | "Write to GCS" >> ParDo(WriteToGCS(output_path))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_topic",
help="The Cloud Pub/Sub topic to read from." '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
)
parser.add_argument(
"--window_size",
type=float,
default=1.0,
help="Output file's window size in minutes.",
)
parser.add_argument(
"--output_path",
help="Path of the output GCS file including the prefix.",
)
parser.add_argument(
"--num_shards",
type=int,
default=5,
help="Number of shards to use when writing windowed elements to GCS.",
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.input_topic,
known_args.output_path,
known_args.window_size,
known_args.num_shards,
pipeline_args
)
On my local with DirectRunner it is working fine, i can see the messages are consumed from pub/sub and get writing to GCS without any issue.
But when i tried to follow this instruction: Running on Dataproc cluster (YARN backed), after submitted the job to the remote spark cluster, i kept getting the below error:
[2022-05-07 11:56:45.801]Container exited with a non-zero exit code 13. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
orImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
Caused by: java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "42_ReadStringsFromPubSub/ReadFromPubSub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
at org.apache.beam.runners.core.construction.graph.QueryablePipeline.buildNetwork(QueryablePipeline.java:234)
at org.apache.beam.runners.core.construction.graph.QueryablePipeline.<init>(QueryablePipeline.java:127)
at org.apache.beam.runners.core.construction.graph.QueryablePipeline.forPrimitivesIn(QueryablePipeline.java:90)
at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.<init>(GreedyPipelineFuser.java:70)
at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.fuse(GreedyPipelineFuser.java:93)
at org.apache.beam.runners.spark.SparkPipelineRunner.run(SparkPipelineRunner.java:114)
at org.apache.beam.runners.spark.SparkPipelineRunner.main(SparkPipelineRunner.java:263)
... 5 more
P.S.:
- i followed the same instruction: Running on Dataproc cluster (YARN backed) to run the
apache_beam.examples.wordcountexample and it is working ok. apache-beam = {extras = ["gcp"], version = "~2.38.0"}- Seems someone had faced exactly same error with
FlinkRunneralso: Error while running beam streaming pipeline (Python) with pub/sub io in embedded Flinkrunner (apache_beam [GCP])
What could be the issue or what am i missing?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
