'Apache Beam Python Dataflow with GCP Pub/Sub counter is over counting

I have a Dataflow job that accumulates UI interactions via GCP Pub/Sub. I've tested this by using a script that sends many Pub/Sub messages representing interactions to the input_topic. When I do a lower number of messages (<500 per second), the Dataflow job correctly counts the interactions. But when I crank the number of messages up, all of a sudden the Dataflow job send out counts that are way higher (5-10x) than the number of Pub/Sub messages sent to the input_topic.

The ideas I've explored are:

  1. Pub/Sub is resending messages that aren't being acked.

This doesn't make sense because the ack deadline for the input_topic subscription is 1 minute.

  1. Something is wrong with my trigger configuration.

  2. Something I don't understand is happening in ReadFromPubSub or CombineGlobally(CountFn())

class CountFn(beam.CombineFn):
        def create_accumulator(self):
            # interaction1, interaction2, interaction3, interaction4
            return 0, 0, 0, 0

        def add_input(self, interactions, input):
            (interaction1, interaction2, interaction3, interaction4) = interactions
            interaction1_result = interaction1 + input['interaction1'] if ('interaction1' in input and isinstance(input['interaction1'], int) and input['interaction1'] > 0) else interaction1
            interaction2_result = interaction2 + input['interaction2'] if ('interaction2' in input and isinstance(input['interaction2'], int) and input['interaction2'] > 0) else interaction2
            interaction3_result = interaction3 + input['interaction3'] if ('interaction3' in input and isinstance(input['interaction3'], int) and input['interaction3'] > 0) else interaction3
            interaction4_result = interaction4 + input['interaction4'] if ('interaction4' in input and isinstance(input['interaction4'], int) and input['interaction4'] > 0) else interaction4
            return interaction1_result, interaction2_result, interaction3_result, interaction4_result

        def merge_accumulators(self, accumulators):
            interaction1, interaction2, interaction3, interaction4 = zip(*accumulators)
            return sum(interaction1), sum(interaction2), sum(interaction3), sum(interaction4)

        def extract_output(self, interactions):
            (interaction1, interaction2, interaction3, interaction4) = interactions

            output = {
                'interaction1': interaction1,
                'interaction2': interaction2,
                'interaction3': interaction3,
                'interaction4': interaction4
            }
            return output

    def to_json(e):
        try:
            return json.loads(e.decode('utf-8'))
        except json.JSONDecodeError:
            return {}

with beam.Pipeline(options=pipeline_options) as p:
        (p
         | 'Read from pubsub' >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
         | 'To Json' >> beam.Map(to_json)
         | 'Window' >> beam.WindowInto(window.FixedWindows(1),
                                       trigger=AfterProcessingTime(delay=1 * 3),
                                       accumulation_mode=AccumulationMode.DISCARDING,
                                       allowed_lateness=2)
         | 'Calculate Metrics' >> beam.CombineGlobally(CountFn()).without_defaults()
         | 'To bytestring' >> beam.Map(lambda e: json.dumps(e).encode('utf-8'))
         | 'Write to pubsub' >> beam.io.WriteToPubSub(topic=known_args.output_topic))


Solution 1:[1]

Turns out the problem was caused by the client I was using to send the test messages. Nodejs-pubsub issue 847 states that nodejs-pubsub has a problem sending high volumes of messages.

https://github.com/googleapis/nodejs-pubsub/issues/847

There is a comment suggesting a workaround, but I have not tried it myself.

https://github.com/googleapis/nodejs-pubsub/issues/847#issuecomment-886024472

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Chris Del Guercio