'Streaming flink job on dataproc throws gRPC error from the worker

My streaming Flink job (from Pub/Sub source) throws multiple error messages from the worker:

Traceback (most recent call last):
  File "test.py", line 175, in <module>
    run(
  File "test.py", line 139, in run
    pub_sub_data = ( pipeline | "Read from Pub/Sub" >> pubsub.ReadFromPubSub(topic=input_topic))
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 1090, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 614, in __ror__
    result = p.apply(self, pvalueish, label)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 662, in apply
    return self.apply(transform, pvalueish)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 708, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 185, in apply
    return m(transform, input, options)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform
    return transform.expand(input)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/io/external/gcp/pubsub.py", line 98, in expand
    pcoll = pbegin.apply(
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/pvalue.py", line 134, in apply
    return self.pipeline.apply(*arglist, **kwargs)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 708, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 185, in apply
    return m(transform, input, options)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform
    return transform.expand(input)
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/transforms/external.py", line 473, in expand
    response = service.Expand(request)
  File "/opt/conda/default/lib/python3.8/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/opt/conda/default/lib/python3.8/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.UNAVAILABLE
    details = "failed to connect to all addresses"
    debug_error_string = "{"created":"@1651418111.458421765","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3128,"referenced_errors":[{"created":"@1651418111.458419596","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":163,"grpc_status":14}]}"
>

I am using apache_beam.io.external.gcp.pubsub.ReadFromPubSub function for reading the pub sub topic

  • python 3.8
  • apache beam gcp 2.34.0
  • Flink 1.12

Code:

    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, checkpointing_interval=1000, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        pub_sub_data = ( pipeline | "Read from Pub/Sub" >> pubsub.ReadFromPubSub(topic=input_topic))

I did try apache_beam.io.ReadFromPubSub for reading from pub sub topic and below is the error I get

DEBUG:root:java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "22Read from Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
    at org.apache.beam.runners.core.construction.graph.QueryablePipeline.buildNetwork(QueryablePipeline.java:234)
    at org.apache.beam.runners.core.construction.graph.QueryablePipeline.<init>(QueryablePipeline.java:127)
    at org.apache.beam.runners.core.construction.graph.QueryablePipeline.forPrimitivesIn(QueryablePipeline.java:90)
    at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.<init>(GreedyPipelineFuser.java:70)
    at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.fuse(GreedyPipelineFuser.java:93)
    at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:112)
    at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:85)
    at org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:86)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

ERROR:root:java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "22Read from Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
INFO:apache_beam.runners.portability.portable_runner:Job state changed to FAILED
Traceback (most recent call last):
  File "test.py", line 175, in <module>
    run(
  File "test.py", line 144, in run
    _ = main_error | "Transformation Errors to GCS" >> ParDo(WriteToGCS(output_path))
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result.wait_until_finish()
  File "/home/ravi/.local/lib/python3.8/site-packages/apache_beam/runners/portability/portable_runner.py", line 600, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline BeamApp-ravi-0501153516-4f843e9f_2e7c1bb8-7ac7-4adc-a8f4-fa9f0f97b770 failed in state FAILED: java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "22Read from Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
DEBUG:root:Sending SIGINT to job_server



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source