'How to implement the slowly updating side inputs in python

I am attempting to implement the slowly updating global window side inputs example from the documentation from java into python and I am kinda stuck on what the AfterProcessingTime.pastFirstElementInPane() equivalent in python. For the map I've done something like this:

class ApiKeys(beam.DoFn):
    def process(self, elm) -> Iterable[Dict[str, str]]:
        yield TimestampedValue(
            {"<api_key_1>": "<account_id_1>", "<api_key_2>": "<account_id_2>",},
            elm,
        )

map = beam.pvalue.AsSingleton(
    p
    | "trigger pipeline" >> beam.Create([None])
    | "define schedule"
    >> beam.Map(
        lambda _: (
            0,  # would be timestamp.Timestamp.now() in production
            20, # would be timestamp.MAX_TIMESTAMP in production
            1,  # would be around 1 hour or so in production
        )
    )
    | "GenSequence"
    >> PeriodicSequence()
    | "ApplyWindowing"
    >> beam.WindowInto(
        beam.window.GlobalWindows(),
        trigger=Repeatedly(Always(), AfterProcessingTime(???)),
        accumulation_mode=AccumulationMode.DISCARDING,
    )
    | "api_keys" >> beam.ParDo(ApiKeys())
)

I am hoping to use this as a Dict[str, str] input to a downstream function that will have windows of 60 seconds, merging with this one that I hope to update on an hourly basis.

The point is to run this on google cloud dataflow (where we currently just re-release it to update the api_keys).

I've pasted the java example from the documentation below for convenience sake:

public static void sideInputPatterns() {
  // This pipeline uses View.asSingleton for a placeholder external service.
  // Run in debug mode to see the output.
  Pipeline p = Pipeline.create();

  // Create a side input that updates each second.
  PCollectionView<Map<String, String>> map =
      p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
          .apply(
              Window.<Long>into(new GlobalWindows())
                  .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                  .discardingFiredPanes())
          .apply(
              ParDo.of(
                  new DoFn<Long, Map<String, String>>() {

                    @ProcessElement
                    public void process(
                        @Element Long input, OutputReceiver<Map<String, String>> o) {
                      // Replace map with test data from the placeholder external service.
                      // Add external reads here.
                      o.output(PlaceholderExternalService.readTestData());
                    }
                  }))
          .apply(View.asSingleton());

  // Consume side input. GenerateSequence generates test data.
  // Use a real source (like PubSubIO or KafkaIO) in production.
  p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
      .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
      .apply(Sum.longsGlobally().withoutDefaults())
      .apply(
          ParDo.of(
                  new DoFn<Long, KV<Long, Long>>() {

                    @ProcessElement
                    public void process(ProcessContext c) {
                      Map<String, String> keyMap = c.sideInput(map);
                      c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());

                      LOG.debug(
                          "Value is {}, key A is {}, and key B is {}.",
                          c.element(),
                          keyMap.get("Key_A"),
                          keyMap.get("Key_B"));
                    }
                  })
              .withSideInputs(map));
}

/** Placeholder class that represents an external service generating test data. */
public static class PlaceholderExternalService {

  public static Map<String, String> readTestData() {

    Map<String, String> map = new HashMap<>();
    Instant now = Instant.now();

    DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");

    map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
    map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());

    return map;
  }
}

Any ideas as to how to emulate this example would be enormously appreciated, I've spent literally days on this issue now :(

Update #2 based on @AlexanderMoraes

So, I've tried changing it according to my understanding of your suggestions:

        main_window_size = 5
        trigger_interval = 30
        side_input = beam.pvalue.AsSingleton(
            p
            | "trigger pipeline" >> beam.Create([None])
            | "define schedule"
            >> beam.Map(
                lambda _: (
                    0,  # timestamp.Timestamp.now().__float__(),
                    60,  # timestamp.Timestamp.now().__float__() + 30.0,
                    trigger_interval,  # fire_interval
                )
            )
            | "GenSequence" >> PeriodicSequence()
            | "api_keys" >> beam.ParDo(ApiKeys())
            | "window"
            >> beam.WindowInto(
                beam.window.GlobalWindows(),
                trigger=Repeatedly(AfterProcessingTime(window_size)),
                accumulation_mode=AccumulationMode.DISCARDING,
            )        
        )

But when combining this with another pipeline with windowing set to something smaller than trigger_interval I am unable to use the dictionary as a singleton because for some reason they are duplicated:

ValueError: PCollection of size 2 with more than one element accessed as a singleton view. First two elements encountered are "{'<api_key_1>': '<account_id_1>', '<api_key_2>': '<account_id_2>'}", "{'<api_key_1>': '<account_id_1>', '<api_key_2>': '<account_id_2>'}". [while running 'Pair with AccountIDs']

Is there some way to clarify that the singleton output should ignore whatever came before it?



Solution 1:[1]

The title of the question "slowly updating side inputs" refers to the documentation, which already has a Python version of the code. However, the code you provided is from "updating global window side inputs", which just has the Java version for the code. So I will be addressing an answer for the second one.

You are not able to reproduce the AfterProcessingTime.pastFirstElementInPane() within Python. This function is used to fire triggers, which determine when to emit results of each window (refered as pane). In your case, this particular call AfterProcessingTime.pastFirstElementInPane() creates a trigger that fires when the current processing time passes the processing time at which this trigger saw the first element in a pane, here. In Python this is achieve using AfterWatermark and AfterProcessingTime().

Below, there are two pieces of code one in Java and another one in Python. Thus, you can understand more about each one's usage. Both examples set a time-based trigger which emits results one minute after the first element of the window has been processed. Also, the accumulation mode is set for not accumulating the results (Java: discardingFiredPanes() and Python: accumulation_mode=AccumulationMode.DISCARDING).

1- Java:

 PCollection<String> pc = ...;
  pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES))
                               .triggering(AfterProcessingTime.pastFirstElementInPane()
                                                              .plusDelayOf(Duration.standardMinutes(1)))
                               .discardingFiredPanes());

2- Python: the trigger configuration is the same as described in point 1

pcollection | WindowInto(
    FixedWindows(1 * 60),
    trigger=AfterProcessingTime(1 * 60),
    accumulation_mode=AccumulationMode.DISCARDING)

The examples above were taken from thew documentation.

Solution 2:[2]

AsIter() insted of AsSingleton() worked for me

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Alexandre Moraes
Solution 2 user38