'Apache Beam no watermark_estimator_provider while running SDFBoundedSourceReader

def run_pipeline(pipeline_options):
    with beam.Pipeline(options=pipeline_options) as p:
        data = p | 'read' >> beam.io.ReadFromText(s3_input)
        data = data | beam.Map(lambda x: ('dk', x))
        data = data | 'Group into batches' >> beam.GroupIntoBatches(10) 
        data = data | beam.Map(lambda x: x[1])
        data | beam.Map(print)

def run_direct():
    # https://beam.apache.org/documentation/runners/direct/
    pipeline_options = PipelineOptions([
        "--runner=DirectRunner",
        "--direct_num_workers=1",
        "--direct_running_mode=multi_threading"
    ])
    run_pipeline(pipeline_options)

run_direct()

AttributeError: 'apache_beam.runners.common.MethodWrapper' object has no attribute 'watermark_estimator_provider' [while running 'read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/pair']

It reads from S3 file. Add a common key. Then group into batches. Essentially I want to batch the lines.



Solution 1:[1]

However if don't read from file, it works.

with beam.Pipeline() as pipeline:
    batches_with_keys = (
      pipeline
      | 'Create produce' >> beam.Create([1,2,3,4,5,6,7,8])
      | beam.Map(lambda x: ('dk', x))
      | 'Group into batches' >> beam.GroupIntoBatches(3)
      | beam.Map(lambda x: x[1])
      | beam.Map(print))

Solution 2:[2]

Can you try using the s3io instead of beam.io.ReadFromText?

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 haliluyaya
Solution 2 ningk