'Fallback mechanism when Rabbitmq goes down

My Spring Boot application sends events to Timescale through RabbitMQ. I need to know how to save my events if RabbitMQ goes down.

In detail:

The messages that RabbitMQ publish are persistent. When the message broker goes down, the events are not published and I am planning to store these events in a database and publish them again to RabbitMQ when it is up. Any solution and suggestion is welcome.



Solution 1:[1]

Resilient, concurrent, efficient, portable, and local

As Kayaman suggested, if preserving your messages is critical, you should be using a database system that resilient, concurrent, efficient, and preferably local (on the same machine).

  • If RabbitMQ is not available, the cause is likely to be related to a network outage. So your fallback mechanism should be local on the same machine, if possible.
  • We don't want to overburden your local server. So a database used as the fallback should ideally be efficient with its use of RAM, CPU, and storage.
  • If access to RabbitMQ fails repeatedly in rapid succession, we may be adding messages to our fallback database while overlapping in time with moving recorded messages out of the database going back to RabbitMQ. So the fallback mechanism should be able to handle concurrent access.
  • If preserving these messages is critical, then our fallback database should be resilient, able to tolerate crashes/power-outages.
  • If the server gets redeployed, it would be nice if the fallback mechanism was portable, not dependent on any particular OS or CPU instruction set. So Java-based would address that issue.

H2 Database Engine

My first thought to meet those needs is to use the H2 Database Engine. H2 is a relational database, built in pure Java, actively developed, and proven to be production-worthy.

A similar product to consider is Apache Derby. But I have heard of various issues that may mean it is not production-worthy, though you should research its current condition.

The relational part of H2 may not be relevant, as you may need only a single table to track your stream of messages to be resent later to RabbitMQ. As for the other requirements of being local, efficient, resilient, portable, and concurrent, H2 fits the bill. And if you ever need to add additional tables, H2 is ready as a fully relational database system.

H2 can be started in either of two modes:

Which mode is appropriate to your needs is hard to say without more information. If you want external monitoring tools to attach, then you may want server mode. If you want simple & lean, then embedded mode.

Your Java app connects to H2 via the included JDBC driver.

Solution 2:[2]

Rabbit has its own persistence, you can configure your queues so that they persist through a shutdown and come right back up with the same messages they had before the failure.

Solution 3:[3]

To expand a bit on my comment: There is really no need to use a database for persistent buffering.

EDIT: Also, the very reason for not being able to send messages to RabbitMQ might well be a lost network connection. Most DBMSes would be of little use in this case. End of edit.

package io.mahlberg.stackoverflow.questions.objpersistencedemo;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;

public class PersistenceDemo {

  /*
   * A decent default number of objects to be buffered.
   */
  private static long COUNT = 10000L;

  /*
   * Default filename
   */
  private static final String OBJ_DAT = "./obj.dat";

  private static FileOutputStream   fos;
  private static ObjectOutputStream oos;

  private static FileInputStream   fis;
  private static ObjectInputStream ois;

  private static File dat;

  private static Object lock;

  public static void main(String[] args) throws InterruptedException, IOException {

    // Get the actual number of counts
    if (args[0] != null) {
      COUNT = Long.parseLong(args[0]);
    }

    // Initialize out lock
    lock = new Object();

    // Ensure the datafile exists.
    dat = new File(OBJ_DAT);
    dat.createNewFile();

    // Initialize our streams.
    try {
      fos = new FileOutputStream(dat);
    } catch (Exception e1) {
      e1.printStackTrace();
      System.exit(1);
    }

    oos = new ObjectOutputStream(fos);

    // Define the writer thread.
    Thread writer = new Thread(new Runnable() {

      public void run() {
        Data obj;

        // Make sure we have the behaviour of the queue.
        synchronized (lock) {

          for (int i = 0; i < COUNT; i++) {
            obj = new Data(String.format("Obj-%d", i), new Date());
            try {
              oos.writeObject(obj);
              oos.flush();
              fos.flush();

              // Notify the reader...
              lock.notify();
            } catch (IOException e1) {
              // TODO Auto-generated catch block
              e1.printStackTrace();
            }
            try {
              // ... and wait until the reader is finished.
              lock.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
              break;
            }
          }
          // We need to notify the reader one last time for the last
          // Object we put into the stream.
          lock.notify();
        }

      }
    });

    // Initialize the streams used by reader.
    fis = new FileInputStream(dat);
    ois = new ObjectInputStream(fis);

    Thread reader = new Thread(new Runnable() {

      public void run() {
        Data obj;
        while (true) {
          synchronized (lock) {

            try {
              obj = (Data) ois.readObject();

              // Notify writer we are finished with reading the latest entry...
              lock.notify();
            } catch (ClassNotFoundException e1) {
              e1.printStackTrace();
            } catch (IOException e1) {
              break;
            }

            try {
              // ...and wait till writer is done writing.
              lock.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
              break;
            }

          }
        }
      }
    });

    // For doing a rough performance measurement.
    Instant start = Instant.now();
    writer.start();
    reader.start();

    // Wait till both threads are done.
    writer.join();
    reader.join();

    Instant end = Instant.now();
    Duration timeElapsed = Duration.between(start, end);
    System.out.format("Took %sms for %d objects\n", timeElapsed.toMillis(), COUNT);
    System.out.format("Avg: %.3fms/object\n", ((double) timeElapsed.toMillis() / COUNT));

    // Cleanup
    oos.close();
    fos.close();
    ois.close();
    fis.close();
  }

}

Basically, we use synchronize, notify and wait to emulate a FIFO buffer with a file.

Please note that I took some shortcuts for the sake of brevity and readability, but I guess you get the picture. The reader should check for the file size every once in a while (how often depends on the size of your data) and truncate the file, and the error handling is practically non-existant. I created a jar from that class and a data class, and here are some sample results:

$ for i in {1..10}; do java -jar target/objpersistencedemo-0.0.1-SNAPSHOT.jar 20000; done
20000
Took 1470ms for 20000 objects
Avg: 0,074ms/object
20000
Took 1510ms for 20000 objects
Avg: 0,076ms/object
20000
Took 1614ms for 20000 objects
Avg: 0,081ms/object
20000
Took 1600ms for 20000 objects
Avg: 0,080ms/object
20000
Took 1626ms for 20000 objects
Avg: 0,081ms/object
20000
Took 1620ms for 20000 objects
Avg: 0,081ms/object
20000
Took 1489ms for 20000 objects
Avg: 0,074ms/object
20000
Took 1604ms for 20000 objects
Avg: 0,080ms/object
20000
Took 1632ms for 20000 objects
Avg: 0,082ms/object
20000
Took 1564ms for 20000 objects
Avg: 0,078ms/object

Please note that these values are for writing and reading. I guess that less than 0.1ms/object is quite a bit faster than a write and subsequent read from an RDBMS.

If you let your reader do the sending of the messages to the RabbitMQ instance and add a little truncation and backoff logic, you can basically ensure that all the events you have are either in the buffer file or written to RabbitMQ.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Arlo Guthrie
Solution 3