'How to correlate log events in distributed Vertx system

while doing logs in the multiple module of vertx, it is a basic requirement that we should be able to correlate all the logs for a single request.

as vertx being asynchronous what will be the best place to keep logid, conversationid, eventid.

any solution or patterns we can implement?



Solution 1:[1]

In a thread based system, you current context is held by the current thread, thus MDC or any ThreadLocal would do.

In an actor based system such as Vertx, your context is the message, thus you have to add a correlation ID to every message you send.

For any handler/callback you have to pass it as method argument or reference a final method variable.

For sending messages over the event bus, you could either wrap your payload in a JsonObject and add the correlation id to the wrapper object

vertx.eventBus().send("someAddr", 
  new JsonObject().put("correlationId", "someId")
                  .put("payload", yourPayload));

or you could add the correlation id as a header using the DeliveryOption

//send
vertx.eventBus().send("someAddr", "someMsg", 
            new DeliveryOptions().addHeader("correlationId", "someId"));

//receive    
vertx.eventBus().consumer("someAddr", msg -> {
        String correlationId = msg.headers().get("correlationId");
        ...
    });

There are also more sophisticated options possible, such as using an Interceptor on the eventbus, which Emanuel Idi used to implement Zipkin support for Vert.x, https://github.com/emmanuelidi/vertx-zipkin, but I'm not sure about the current status of this integration.

Solution 2:[2]

There's a surprising lack of good answers published about this, which is odd, given how easy it is.

Assuming you set the correlationId in your MDC context on receipt of a request or message, the simplest way I've found to propagate it is to use interceptors to pass the value between contexts:

vertx.eventBus()
        .addInboundInterceptor(deliveryContext -> {
            MultiMap headers = deliveryContext.message().headers();
            if (headers.contains("correlationId")) {
                MDC.put("correlationId", headers.get("correlationId"));
                deliveryContext.next();
            }
        })
        .addOutboundInterceptor(deliveryContext -> {
            deliveryContext.message().headers().add("correlationId", MDC.get("correlationId"));
            deliveryContext.next();
        });

Solution 3:[3]

If by multiple module you mean multiple verticles running on the same Vertx instance, you should be able to use a normal logging library such as SLF4J, Log4J, JUL, etc. You can then keep the logs in a directory of your choice, e.g. /var/logs/appName.

If, however, you mean how do you correlate logs between multiple instances of Vertx, then I'd suggest looking into GrayLog or similar applications for distributed/centralised logging. If you use a unique ID per request, you can pass that around and use it in the logs. Or depending on your authorization system, if you use unique tokens per request you can log those. The centralised logging system can be used to aggregate and filter logs based on that information.

Solution 4:[4]

The interceptor example presented by Clive Evans works great. I added a more details example showing how this might work:

import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.time.Duration;
import java.util.UUID;

public class PublisherSubscriberInterceptor {

  private static final Logger LOG = LoggerFactory.getLogger(PublisherSubscriberInterceptor.class);
  public static final String ADRESS = "sender.address";


  public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();
    createInterceptors(vertx);

    vertx.deployVerticle(new Publisher());
    vertx.deployVerticle(new Subscriber1());

    //For our example lets deploy subscriber2 2 times.
    vertx.deployVerticle(Subscriber2.class.getName(), new DeploymentOptions().setInstances(2));
  }

  private static void createInterceptors(Vertx vertx) {
    vertx.eventBus()
      .addInboundInterceptor(deliveryContext -> {
        MultiMap headers = deliveryContext.message().headers();
        if (headers.contains("myId")) {
          MDC.put("myId", headers.get("myId"));
          deliveryContext.next();
        }
      })
      .addOutboundInterceptor(deliveryContext -> {
        deliveryContext.message().headers().add("myId", MDC.get("myId"));
        deliveryContext.next();
      });
  }

  public static class Publisher extends AbstractVerticle {

    @Override
    public void start(Promise<Void> startPromise) throws Exception {
      startPromise.complete();
      vertx.setPeriodic(Duration.ofSeconds(5).toMillis(), id -> {
        MDC.put("myId", UUID.randomUUID().toString());
      vertx.eventBus().publish(Publish.class.getName(), "A message for all");
      });
    }
  }

  public static class Subscriber1 extends AbstractVerticle {
    private static final Logger LOG = LoggerFactory.getLogger(Subscriber1.class);

    @Override
    public void start(Promise<Void> startPromise) throws Exception {
      startPromise.complete();
      vertx.eventBus().consumer(Publish.class.getName(), message-> {
        LOG.debug("Subscriber1 Received: {}", message.body());
      });
    }
  }

  public static class Subscriber2 extends AbstractVerticle {
    private static final Logger LOG = LoggerFactory.getLogger(Subscriber2.class);

    @Override
    public void start(Promise<Void> startPromise) throws Exception {
      startPromise.complete();
      vertx.eventBus().consumer(Publish.class.getName(), message-> {
        LOG.debug("Subscriber2 Received: {}", message.body());
      });
    }
  }
}

you can see the log example for publishing 2 messages:

    13:37:14.315 [vert.x-eventloop-thread-3][myId=a2f0584c-9d4e-48a8-a724-a24ea12f7d80] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all
    13:37:14.315 [vert.x-eventloop-thread-1][myId=a2f0584c-9d4e-48a8-a724-a24ea12f7d80] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber1 - Subscriber1 Received: A message for all
    13:37:14.315 [vert.x-eventloop-thread-4][myId=a2f0584c-9d4e-48a8-a724-a24ea12f7d80] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all
    13:37:19.295 [vert.x-eventloop-thread-1][myId=63b5839e-3b0b-43a5-b379-92bd1466b870] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber1 - Subscriber1 Received: A message for all
    13:37:19.295 [vert.x-eventloop-thread-3][myId=63b5839e-3b0b-43a5-b379-92bd1466b870] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all
    13:37:19.295 [vert.x-eventloop-thread-4][myId=63b5839e-3b0b-43a5-b379-92bd1466b870] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all

Solution 5:[5]

Use vertx-sync and a ThreadLocal for the correlation ID. (i.e., a "FiberLocal"). Works great for me.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Gerald Mücke
Solution 2 Clive Evans
Solution 3 amb85
Solution 4
Solution 5 DoctorPangloss