'Dynamic paths with apache beam and Runtime parameters

I am creating a pipeline TEMPLATE which takes some input file and counts the words on it. All works fine until this point, but the thing is that I need to pass another parameter (from the function where I call the template) that lets me pass the name of the file so I can create a path with it.

I'll show you an example of what I want though I know pipelines can't access Runtime parameters during pipeline construction or outside a runtime context this can help to give you an Idea of what I need to do:

class tempatableTest(PipelineOptions):
@classmethod
def _add_argparse_args(cls,parser):
    parser.add_value_provider_argument(
        '--input',
        type=str,
        help='path to the input file'
    )
    parser.add_value_provider_argument(
        '--fdinamic',
        type=str,
        help='folder name'
    )

templatable_test = PipelineOptions().view_as(tempatableTest)
beam_options= PipelineOptions()
input = templatable_test.input
dinamicName = templatable_test.fdinamic.get()

with beam.Pipeline(options=beam_options) as p:
    lines = p | beam.io.ReadFromText(input)
    len = lines | beam.combiners.Count.Globally()
    len | 'countTotalLen' >> beam.io.WriteToText(f'gs://bucket-test-out/processedFile/{dinamicName}/count.txt')

If I use templatable_test.fdinamic.get() I'd get the runtime error but if I remove the .get() I'd get a super long name on the folder.

I know probably this isn't the way to go but is just to illustrate what I need to do, thank you for your help.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source