'Beam Java pipeline with Python transform in GC Dataflow_v2

I'm trying to create a multi language pipeline in Google Dataflow_v2, creating a pipeline in Apache Beam Java SDK and using a transform in Python SDK. My expansion service in Python runs correctly and when I run my Java pipeline it does reach my Python transform but then it throws me an error.

Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project Job: An exception occured while executing the Java class. UNIMPLEMENTED: Method not found!

This is my expansion service with my transform in Python

import argparse, logging, signal, sys, grpc
import apache_beam as beam
from apache_beam.pipeline import PipelineOptions
from apache_beam.portability.api import beam_expansion_api_pb2_grpc
from apache_beam.runners.portability import expansion_service
from apache_beam.transforms import ptransform
from apache_beam.utils import thread_pool_executor

_LOGGER = logging.getLogger(__name__)

URN = "beam:transforms:xlang:pythontransform3"

class WriteToGS(beam.DoFn):
  def process(self, element):
    beam.io.WriteToText("gs://path/to/wordcount.txt")

@ptransform.PTransform.register_urn(URN, None)
class PythonTransform(ptransform.PTransform):
    def __init__(self):
        super().__init__()

    def expand(self, pcoll):
        _LOGGER.info('Python transform reached')
        (pcoll | "Python transform" >> beam.ParDo(WriteToGS()))

    def to_runner_api_parameter(self, unused_context):
        return URN, None

    def from_runner_api_parameter(
        unused_ptransform, unused_paramter, unused_context):
        return PythonTransform()

server = None

def cleanup(unused_signum, unused_frame):
  _LOGGER.info('Shutting down expansion service.')
  server.stop(None)

def main(unused_argv):
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '-p', '--port', type=int, help='port on which to serve the job api')
  options = parser.parse_args()
  global server
  server = grpc.server(thread_pool_executor.shared_unbounded_instance())
  beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
      expansion_service.ExpansionServiceServicer(
          PipelineOptions(["--experiments", "beam_fn_api", "--sdk_location", "container"])),
      server)
  server.add_insecure_port('localhost:{}'.format(options.port))
  server.start()
  _LOGGER.info('Listening for expansion requests at %d', options.port)

  signal.signal(signal.SIGTERM, cleanup)
  signal.signal(signal.SIGINT, cleanup)
  # blocking main thread forever.
  signal.pause()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  main(sys.argv)

This is my pipeline in Java

import org.apache.beam.runners.core.construction.External;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;

public class WordCount {

  static class ExtractWordsFn extends DoFn<String, String> {
    private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
    private final Distribution lineLenDist =
        Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");

    @ProcessElement
    public void processElement(@Element String element, OutputReceiver<String> receiver) {
      lineLenDist.update(element.length());
      if (element.trim().isEmpty()) {
        emptyLines.inc();
      }

      // Split the line into words.
      String[] words = element.split("[^\\p{L}]+", -1);

      // Output each word encountered into the output PCollection.
      for (String word : words) {
        if (!word.isEmpty()) {
          receiver.output(word);
        }
      }
    }
  }

 public static class CountWords
      extends PTransform<PCollection<String>, PCollection<String>> {
    @Override
    public PCollection<String> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      return words;
    }
  }

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  static void runWordCount(WordCountOptions options) {
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
        .apply(new CountWords())
        .apply("ExternalPythonTransform",
                External.of("beam:transforms:xlang:pythontransform3", new byte [] {}, "localhost:9098"));

    p.run().waitUntilFinish();
  }

  public static void main(String[] args) {
    WordCountOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);

    runWordCount(options);
  }
}


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source