'How to solve the data loss issue in Event Emitters in NodeJS? MicroServices

Below code is just for testing purpose , if it works then i want to implement the same in real project with microservices. I have 4 microservices in one server. And the communication between microservices is through rabbitMQ.

My flow :

Microservice 1 (Send data from here through rabbitMQ) =====> Microservice 2 (receive data here from rabbitMQ) resend the data back to the same to ====> Microservice 1.

Microservice 1 has two files:

  1. Emitter.controller.js
  2. recievedAuthController.js

Microservice 2 has one file:

  1. receivedSettingsController.js

Now,Here is the Flow ->

Emitter.controller.js (M1) sends data to receivedSettingsController.js (M2) and again receivedSettingsController.js (M2) sends the same data back to recievedAuthController.js (M1) which has event emitter (emitter_test) this emitter emits data which is listened in Emitter.controller.js (M1), After this we are sending the http response.

At first we are storing all the responses in map with “key” as mongoDBObjectId for each request just for uniqueness and the “value” is responses of each request. let emit_response_list_map = new Map();

At last we are accessing the response through Map and sending back the response.

My Issue:

  1. If I hit 100 requests through Jmeter then , all the flow works but at event emitter in recievedAuthController.js (M1) emits duplicate values sometimes., i.e the ObjectID we are sending throughout the flow appears 2-3 times in the listener in Emitter.controller.js (M1) and data loss happens. Note: In listener all 100 emits data is received but in that 100 some of the ObjectID’s are repeating, all are not unique. How to solve this data loss issue in Event Emitters?

enter image description here

My Code Below

MicroService 1 : File: Emitter.controller.js

var common = require('../config/event_emitter');
var commonEmitter = common.commonEmitter;
const { sendMessage } = require('../config/rabbitmq/emit');
const connection = require('../config/rabbitmq/connection');
var mongoose = require('mongoose');
const { parse, stringify } = require('flatted');



let emit_response_list_map = new Map();
exports.emitOnceTesting = async (req, res, next) => {
    let count = mongoose.Types.ObjectId();
    emit_response_list_map.set(count.toString(), res);

    const rabbit_conn = await connection.connect();
    const emit_channel = await rabbit_conn.createChannel();
    let method = "emit_r";

    let dataObj = { method, count };
    let dataPacket = { exchange: 'device_model', key: "emit.key", data: dataObj };
    emit_channel.assertExchange('device_model', 'topic', {
        durable: false
    });

    sendMessage(emit_channel, dataPacket);
    console.log(`============================== [[SEND] => [FROM] auth_ms : emit_controller: ${method} [TO] settings_ms : receivedDeviceController: emit_controller: ${method}] =======================================, ${dataPacket.data.count}`);

    commonEmitter.on('emitter_test', async (data) => {
        try {
            console.log("______________emitter_test_______________", data.index);

            commonEmitter.removeAllListeners('emitter_test');
            await emit_channel.close();
            const res = emit_response_list_map.get(data.index);

            res.status(200).json({ status: true });        
        } catch (err) {
            console.log(err);
        }
    });
}

MicroService 2:-> File: receivedSettingsController.js :

 let emitTesting_q1 = await device_channel_sett_ms.assertQueue('emit1test_r', { exclusive: true });
        device_channel_sett_ms.bindQueue(emitTesting_q1.queue, 'device_model', 'emit.key');
        device_channel_sett_ms.consume(emitTesting_q1.queue, async (msg) => {
            try {
                // @ts-ignore
                let data = parse(msg.content);
                console.log('============================== [[RECEIVED] => [HERE] settings_ms : receivedDeviceController: emitTesting [FROM] coin_zone_ms : receivedDeviceController: emitTesting] =======================================', data);            //res.json({ status: true, success: data });
                let dataPacketAck = { exchange: 'device_model', key: 'emit.key.ack', data };

                await sendMessage(device_channel_sett_ms, dataPacketAck);
                console.log('============================== [[SEND-ACK] => [FROM] settings_ms : receivedDeviceController: emitTesting [FROM] coin_zone_ms : receivedDeviceController:emitTesting ] =======================================', data);
            } catch (err) {
                console.log('exception in getGeoFenceSetting : getGeoFenceDetails -----------', err);
            }
        },
            {
                noAck: true
            }
        );


Microservice 1: File: recievedAuthController.js

   let emit_key_ack = await device_channel_auth_ms.assertQueue('', { exclusive: true });
    device_channel_auth_ms.bindQueue(emit_key_ack.queue, 'device_model', 'emit.key.ack');
    device_channel_auth_ms.consume(emit_key_ack.queue, async (msg) => {
        // @ts-ignore
        let data = parse(msg.content);
        console.log("___________________________________ack________________", data.countack);
        if (data.method = "emit_r") {
            commonEmitter.emit('emitter_test', data);
        } else if (data.method = "") {
        } else if (data.method = "") {
        }
    });



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source