'AWS API gateway websocket receives messages inconsistently

I have a websocket in api gateway connected to a lambda that looks like this:

const AWS = require('aws-sdk');
const amqp = require('amqplib');

const api = new AWS.ApiGatewayManagementApi({
    endpoint: 'MY_ENDPOINT',
});

async function sendMsgToApp(response, connectionId) {
    console.log('=========== posting reply');
    const params = {
        ConnectionId: connectionId,
        Data: Buffer.from(response),
    };
    return api.postToConnection(params).promise();
}

let rmqServerUrl =
    'MY_RMQ_SERVER_URL';
let rmqServerConn = null;

exports.handler = async event => {
    console.log('websocket event:', event);
    const { routeKey: route, connectionId } = event.requestContext;

    switch (route) {
        case '$connect':
            console.log('user connected');
            const creds = event.queryStringParameters.x;
            console.log('============ x.length:', creds.length);
            const decodedCreds = Buffer.from(creds, 'base64').toString('utf-8');
            try {
                const conn = await amqp.connect(
                    `amqps://${decodedCreds}@${rmqServerUrl}`
                );
                const channel = await conn.createChannel();
                console.log('============ created channel successfully:');
                rmqServerConn = conn;
                const [userId] = decodedCreds.split(':');
                const { queue } = await channel.assertQueue(userId, {
                    durable: true,
                    autoDelete: false,
                });
                console.log('============ userId:', userId, 'queue:', queue);
                channel.consume(queue, msg => {
                    console.log('========== msg:', msg);
                    const { content } = msg;
                    const msgString = content.toString('utf-8');
                    console.log('========== msgString:', msgString);
                    sendMsgToApp(msgString, connectionId)
                        .then(res => {
                            console.log(
                                '================= sent queued message to the app, will ack, outcome:',
                                res
                            );
                            try {
                                channel.ack(msg);
                            } catch (e) {
                                console.log(
                                    '================= error acking message:',
                                    e
                                );
                            }
                        })
                        .catch(e => {
                            console.log(
                                '================= error sending queued message to the app, will not ack, error:',
                                e
                            );
                        });
                });
            } catch (e) {
                console.log(
                    '=========== error initializing amqp connection',
                    e
                );
                if (rmqServerConn) {
                    await rmqServerConn.close();
                }
                const response = {
                    statusCode: 401,
                    body: JSON.stringify('failed auth!'),
                };
                return response;
            }
            break;
        case '$disconnect':
            console.log('user disconnected');
            if (rmqServerConn) {
                await rmqServerConn.close();
            }
            break;
        case 'message':
            console.log('message route');
            await sendMsgToApp('test', connectionId);
            break;
        default:
            console.log('unknown route', route);
            break;
    }
    const response = {
        statusCode: 200,
        body: JSON.stringify('Hello from websocket Lambda!'),
    };
    return response;
};

The amqp connection is for a rabbitmq server that's provisioned by amazonmq. The problem I have is that messages published to the queue either do not show up at all in the .consume callback, or they only show up after the websocket is disconnected and reconnected. Essentially they're missing until a point much later after which they show up unexpectedly. That's within the websocket. Even when they do show up, they don't get sent to the client (app in this case) that's connected to the websocket. What could be the problem here?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source