'Apache Beam Python Dataflow with GCP Pub/Sub counter is over counting
I have a Dataflow job that accumulates UI interactions via GCP Pub/Sub. I've tested this by using a script that sends many Pub/Sub messages representing interactions to the input_topic. When I do a lower number of messages (<500 per second), the Dataflow job correctly counts the interactions. But when I crank the number of messages up, all of a sudden the Dataflow job send out counts that are way higher (5-10x) than the number of Pub/Sub messages sent to the input_topic.
The ideas I've explored are:
- Pub/Sub is resending messages that aren't being acked.
This doesn't make sense because the ack deadline for the input_topic subscription is 1 minute.
Something is wrong with my trigger configuration.
Something I don't understand is happening in ReadFromPubSub or CombineGlobally(CountFn())
class CountFn(beam.CombineFn):
def create_accumulator(self):
# interaction1, interaction2, interaction3, interaction4
return 0, 0, 0, 0
def add_input(self, interactions, input):
(interaction1, interaction2, interaction3, interaction4) = interactions
interaction1_result = interaction1 + input['interaction1'] if ('interaction1' in input and isinstance(input['interaction1'], int) and input['interaction1'] > 0) else interaction1
interaction2_result = interaction2 + input['interaction2'] if ('interaction2' in input and isinstance(input['interaction2'], int) and input['interaction2'] > 0) else interaction2
interaction3_result = interaction3 + input['interaction3'] if ('interaction3' in input and isinstance(input['interaction3'], int) and input['interaction3'] > 0) else interaction3
interaction4_result = interaction4 + input['interaction4'] if ('interaction4' in input and isinstance(input['interaction4'], int) and input['interaction4'] > 0) else interaction4
return interaction1_result, interaction2_result, interaction3_result, interaction4_result
def merge_accumulators(self, accumulators):
interaction1, interaction2, interaction3, interaction4 = zip(*accumulators)
return sum(interaction1), sum(interaction2), sum(interaction3), sum(interaction4)
def extract_output(self, interactions):
(interaction1, interaction2, interaction3, interaction4) = interactions
output = {
'interaction1': interaction1,
'interaction2': interaction2,
'interaction3': interaction3,
'interaction4': interaction4
}
return output
def to_json(e):
try:
return json.loads(e.decode('utf-8'))
except json.JSONDecodeError:
return {}
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'Read from pubsub' >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
| 'To Json' >> beam.Map(to_json)
| 'Window' >> beam.WindowInto(window.FixedWindows(1),
trigger=AfterProcessingTime(delay=1 * 3),
accumulation_mode=AccumulationMode.DISCARDING,
allowed_lateness=2)
| 'Calculate Metrics' >> beam.CombineGlobally(CountFn()).without_defaults()
| 'To bytestring' >> beam.Map(lambda e: json.dumps(e).encode('utf-8'))
| 'Write to pubsub' >> beam.io.WriteToPubSub(topic=known_args.output_topic))
Solution 1:[1]
Turns out the problem was caused by the client I was using to send the test messages. Nodejs-pubsub issue 847 states that nodejs-pubsub has a problem sending high volumes of messages.
https://github.com/googleapis/nodejs-pubsub/issues/847
There is a comment suggesting a workaround, but I have not tried it myself.
https://github.com/googleapis/nodejs-pubsub/issues/847#issuecomment-886024472
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Chris Del Guercio |
