'Beam Java pipeline with Python transform in GC Dataflow_v2
I'm trying to create a multi language pipeline in Google Dataflow_v2, creating a pipeline in Apache Beam Java SDK and using a transform in Python SDK. My expansion service in Python runs correctly and when I run my Java pipeline it does reach my Python transform but then it throws me an error.
Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project Job: An exception occured while executing the Java class. UNIMPLEMENTED: Method not found!
This is my expansion service with my transform in Python
import argparse, logging, signal, sys, grpc
import apache_beam as beam
from apache_beam.pipeline import PipelineOptions
from apache_beam.portability.api import beam_expansion_api_pb2_grpc
from apache_beam.runners.portability import expansion_service
from apache_beam.transforms import ptransform
from apache_beam.utils import thread_pool_executor
_LOGGER = logging.getLogger(__name__)
URN = "beam:transforms:xlang:pythontransform3"
class WriteToGS(beam.DoFn):
def process(self, element):
beam.io.WriteToText("gs://path/to/wordcount.txt")
@ptransform.PTransform.register_urn(URN, None)
class PythonTransform(ptransform.PTransform):
def __init__(self):
super().__init__()
def expand(self, pcoll):
_LOGGER.info('Python transform reached')
(pcoll | "Python transform" >> beam.ParDo(WriteToGS()))
def to_runner_api_parameter(self, unused_context):
return URN, None
def from_runner_api_parameter(
unused_ptransform, unused_paramter, unused_context):
return PythonTransform()
server = None
def cleanup(unused_signum, unused_frame):
_LOGGER.info('Shutting down expansion service.')
server.stop(None)
def main(unused_argv):
parser = argparse.ArgumentParser()
parser.add_argument(
'-p', '--port', type=int, help='port on which to serve the job api')
options = parser.parse_args()
global server
server = grpc.server(thread_pool_executor.shared_unbounded_instance())
beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
expansion_service.ExpansionServiceServicer(
PipelineOptions(["--experiments", "beam_fn_api", "--sdk_location", "container"])),
server)
server.add_insecure_port('localhost:{}'.format(options.port))
server.start()
_LOGGER.info('Listening for expansion requests at %d', options.port)
signal.signal(signal.SIGTERM, cleanup)
signal.signal(signal.SIGINT, cleanup)
# blocking main thread forever.
signal.pause()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
main(sys.argv)
This is my pipeline in Java
import org.apache.beam.runners.core.construction.External;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
public class WordCount {
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
private final Distribution lineLenDist =
Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> receiver) {
lineLenDist.update(element.length());
if (element.trim().isEmpty()) {
emptyLines.inc();
}
// Split the line into words.
String[] words = element.split("[^\\p{L}]+", -1);
// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
receiver.output(word);
}
}
}
}
public static class CountWords
extends PTransform<PCollection<String>, PCollection<String>> {
@Override
public PCollection<String> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
return words;
}
}
public interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
}
static void runWordCount(WordCountOptions options) {
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply("ExternalPythonTransform",
External.of("beam:transforms:xlang:pythontransform3", new byte [] {}, "localhost:9098"));
p.run().waitUntilFinish();
}
public static void main(String[] args) {
WordCountOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
runWordCount(options);
}
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
