'How can I mock RabbitMQClient of io.quarkiverse.rabbitmqclient.RabbitMQClient and write junit for basic send and consume operation?

I'm new to the quarkus framework where I'm writing rabbitmq-client library based on quarkur framework. I'm using io.quarkiverse.rabbitmqclient.RabbitMQClient. I need to write JUnit for basic send and consume operations, please help me with how can I write junit and mock RabbitMQClient. I'm using the below code to send and consume message.

@ApplicationScoped
public class RabbitMQProducerAdapterImpl extends RabbitMQCongiguration implements RabbitMQProducerAdapter {

    @Override
    public void sendMessage(String exchange, String routingKey, String messagePayload) throws IOException {
        setUpConnectionAndChannel();
        channel.basicPublish(exchange, routingKey, null, messagePayload.getBytes(StandardCharsets.UTF_8));
        Log.info("message sent succefully: " + messagePayload);
    }

}

Here is the RabbitMQCongiguration

@ApplicationScoped
public class RabbitMQCongiguration {

    @Inject
    private RabbitMQClient rabbitClient;

    protected Channel channel;

    protected void setUpConnectionAndChannel() {
        try {
            // create a connection
            Connection connection = rabbitClient.connect();
            // create a channel
            channel = connection.createChannel();

        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    protected void setupQueueInDirectExchange(String exchangeName, String routingKey, String queueName,
            boolean createExchangeQueues) throws IOException {
        setUpConnectionAndChannel();
        if (createExchangeQueues) {
            this.channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);

            // declaring a queue for this channel. If queue does not exist,
            // it will be created on the server. this line not needed if queue already
            // present
            this.channel.queueDeclare(queueName, true, false, false, null);
        }

        // Bind Routing Key to Exchange
        this.channel.queueBind(queueName, exchangeName, routingKey);
      }
    }

Below is the class for consumer

@ApplicationScoped
public class RabbitMQConsumerAdapterImpl extends RabbitMQCongiguration implements RabbitMQConsumerAdapter, Runnable {

    private String queueName;
    private MessageProcessor messageProcessor;

    @Override
    public void consumeMessage(String exchange, String queueName, String routingKey,
            MessageProcessor messageProcessor) throws IOException {
        Log.info("starting consumer...");
        try {
            this.queueName = queueName;
            this.messageProcessor = messageProcessor;
            Log.info("setting up rabbitMQPrefetchCountConfig");
            setupQueueInDirectExchange(exchange, routingKey, queueName, false);
            Thread consumerThread = new Thread(this);
            consumerThread.start();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    
    @Override
    public void run() {
        try {
            // start consuming messages. Auto acknowledge messages.
            Log.info("Start consuming messages from thread...");    
            channel.basicConsume(this.queueName, false, (Consumer) new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String msgPayload = null;
                    if (body == null || body.length == 0) {
                        Log.warn("Invalid Message Body - Consumer Tag : " + consumerTag + ", Message DeliveryTag : "
                                + envelope.getDeliveryTag());
                        channel.basicReject(envelope.getDeliveryTag(), false);
                    } else {
                        msgPayload = new String(body);
                        try {
                            JsonParser.parseString(msgPayload);
                        } catch (JsonSyntaxException ex) {
                            Log.error(msgPayload + " is not a valid json, Reason - ", ex);
                            channel.basicReject(envelope.getDeliveryTag(), false);
                            Log.warn("Rejected the current payload.");
                            return;
                        }
                        messageProcessor.processMessage(msgPayload);
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                    // just print the received message.
                    Log.info("Received: " + new String(body, StandardCharsets.UTF_8));
                }
            });
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }

    }
@ApplicationScoped
public class MessageProcessorImpl implements MessageProcessor{

    @Override
    public void processMessage(String messagePayload) {
        Log.info("message consumed: " + messagePayload);
    }

}



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source