'How to process entities in batches but handle individual errors

There is a service that receives and processes messages sent through a message queue broker. Here are the steps that occur:

  1. The service receives bursts of messages (sometimes 10k of messages in a minute)
  2. The service needs to read and process these messages. For example, some will be ignored based on a value of a field. Others need to be transformed.
  3. Now, the transformed data should be inserted into an SQL DB. Some could fail due to bad data.
  4. Successfully processed messages should be acknowledged and the rest nacked.

Question: How to efficiently process these messages in batches and insert them in the DB with error reporting for each message individually?
Doesn't sound complicated? Well, my current implementation looks like (versionA):

@Component class MessageConsumer implements Consumer<List<AcknowledgeableMessage>> {
  @Autowired private DataRepository repository;

  @Override public void consume(List<AcknowledgeableMessage> messages) {
    messages.forEach { message ->
      try {
        if (shouldIgnore(message)) { message.ack(); } else {
          Data data = transformToData(message);
          repository.save(data);
          message.ack();
        }
      } catch (Exception e) {
        message.nack();
      }
    }
  }
}

@Repository class DataRepository {
  @Transactional
  void save(Data data) {
   // SQL insert statement
  }  
}

However, as you can see, each message opens a DB transaction to save the entity which could be a bottleneck. It is preferable to insert to an SQL DB in batches, no?

If I want to write it like this (versionB):

@Repository class Repository {
  @Transactional
  void saveAll(List<Data> dataList) {
     dataList.forEach(this::save);
  }
}

@Component class MessageConsumer implements Consumer<List<AcknowledgeableMessage>> {
  @Autowired private DataRepository repository;

  @Override public void consume(List<AcknowledgeableMessage> messages) {
    try {
       List<Data> dataList = transformToData(messages);
       repository.saveAll(dataList);
       messages.forEach(AcknowledgeableMessage::ack());
    } catch (Exception e) {
       messages.forEach(AcknowledgeableMessage::nack());
    }
  }
}

When given this input:

message1: good
message2: good
message3: bad

# good messages should be processed successfully meaning inserted in the DB and acked
# bad messages should not be inserted in the DB and should be nacked

Then in versionB implementation, the transaction for all entities will be rolled back and thus all previous good messages will be nacked instead of acked. Right?

Is it a matter of trade-off? Or is there another way to do this?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source