'Prevent duplicate messages

Tech stack - Python, Rabbitmq, Elasticsearch, Heroku

I have one application which adds content in the app based on a certain schedule and when the content is added, an email needs to be triggered to some users based on some criteria ( can be around 1million users)

The design I have decided is that first application(producer) will publish the content and then put a message in Rabbitmq(message broker) containing user id and email address for each user who is supposed to get the email, so basically adding around 1mn messages to Rabbitmq and then the email will be sent by email application(consumer)

My question is around the producer application.It is going to get a list of user ids from elasticsearch and start adding them to the queue using a for loop, but if for some reason this application goes down(like in case of new deployment), then only few users will be added to the queue and when the application comes back it will again start queueing and we might end up with duplicate emails to the same users. Is there a standard way to avoid this.Seems like a very common issue.

I am thinking of maintaining the message acknowledgement(for each user for given content)from consumer app within my producer app database.But it feels like that it might lead to records explosion in my PostgreSQL database after a few email sends.Each content can be followed by email trigger to around 1million user.



Solution 1:[1]

You are correct that it's a very common issue. Unfortunately, in general, when sending messages between asynchronously executing processes, you cannot guarantee exactly-once delivery of a message: you have to choose between at-most-once and at-least-once.

You can guarantee at-most-once (which trivially prevents duplicates), by having the producer application never retry a send (even if the producer fails). That may not be what you want, however.

You can guarantee at-least-once by having the consumer acknowledge to the producer that it has received and acted upon the sent message; the producer maintains state (e.g. in a database) that a given message hasn't been acknowledged and retries unacknowledged messages after some time. Note that if the consumer acknowledges after performing the actions, it's possible that the actions (i.e. the email being sent) get at least partially duplicated (consider what happens if the consumer crashes between starting to do the thing and acknowledging that it's done) and if the consumer acknowledges before performing the actions, you've made it into at-most-once delivery (consider the case where the consumer crashes after acknowledgement but before fully doing the thing).

By having the consumer maintain state containing messages its acknowledged, the consumer can deduplicate messages: if it receives a message it's already acknowledged, it acknowledges it again and doesn't do anything else. It's still at-least-once, but the rate of duplicates is substantially reduced (at some expense: every message now entails potentially 3 DB writes (producer writes message unacknowledged, consumer and producer each write acknowledgement), though those need not be to the same DB.

Solution 2:[2]

You definitely want to achieve full-consistency and you might need to extend your stack with Database(Persistent-Storage) supporting a fetch/select with Synchronization.

As a developer of "SWC-DB", I welcome you try out the latest SWC-DB release resolves the issues #1 and #2 that make the consistency possible. There is an example of priority-queue queue-example.cc (other languages can be implemented with the specific thrift-client) You might want to note the .set_opt__deleting() in the "queue-example.cc"'s scan-specs:

  SWC::DB::Specs::Interval intval;
  intval.set_opt__deleting();

Although, I can see other problem of "duplicate emails" which can happen as well when an email has already been sent and a new task arrive for the same. As @levi-ramsey mentioned to track the processed-state by the consumer. In such case, the SELECT can be and should be with an UPDATE=(ns+3d,"SET_A_VALUE_AS_PROCESSED") specifications, and in such case (of to many records remain in the DB) the Schema of the Column can be set with a TTL(lets say 3-days, so yet processed won't be deleted as well). Whereas, the Interval Scan Specs to require the spec for value with other-than (-ne) "SET_A_VALUE_AS_PROCESSED" for the records of select-match. There is the SWC-DB Condition-Value-Expression syntax explains the use with SQL.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Levi Ramsey
Solution 2 Alex Kashirin