'Spring boot Kafka Health check not working

I'm working on a spring boot app that consume Kafka messages. I consume perfectly my Kafka events using a @KafkaListener... and now I want to implement Kafka health check using spring actuator. i used this example : https://medium.com/dna-technology/kafka-consumers-health-check-in-spring-boot-actuator-d00f9017e89d but it doesn't work and I can't find the import for the two classes HealthRow All Status

private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Override
protected void doHealthCheck(Builder builder) {
  Map<String, HealthRow> consumersHealth = new TreeMap<>(Comparator.naturalOrder());
  final AllStatus allStatus = new AllStatus();
  final Collection<MessageListenerContainer> allListenerContainers = kafkaListenerEndpointRegistry
      .getAllListenerContainers();

  for (MessageListenerContainer messageListenerContainer : allListenerContainers) {
    ConsumerDetails consumerDetails = buildConsumerDetails(messageListenerContainer);
    consumersHealth.put(consumerDetails.getName(), consumerDetails);
    updateAllStatus(allStatus, consumerDetails);
  }

  consumersHealth.put(allStatus.getName(), allStatus);

  builder.withDetails(consumersHealth);
  builder.status(allStatus.getStatus());
}

Anyone please knows how to implement Kafka health check correctly using spring boot? Regards.



Solution 1:[1]

Did come across same issue. The way how I solved was to implement my own inner HealthRow class. The fields name and status I use only internally to decide on lower level status to be inherited at the higher level and for the name to then add the Map as an entry at the outer Map ...

static class HealthRow extends LinkedHashMap<String, Object> {
   private String name;
   private Status status;

   String getName() {return name;};
   Status getStatus() {return status;}:
   void setName (String name) (this.name = name;}
   void setstatus (Status status) (this.status = status;};
   Map‹String, Object› getDetails() {return this;};
}

Created my own inner StreamApplicationDetails class

private static class StreamApplicationDetails extends HelathRow {

    public StreamApplicationDetails(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
        final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
        if (kafkaStreams != null) {
            setName(streamsBuilderFactoryBean.getStreamsConfiguration().getProperty("application.id", "Undefined"));
            setStatus(kafkaStreams.state().isRunningOrRebalancing() ? Status.UP : Status.DOWN);
            put("status", getstatus());

            final HealthRow streamThreads = new HealthRow();
            put("streamThreads", streamThreads);
            for (ThreadMetadata threadMetadata : kafkaStreams.metadataForLocalThreads()) {
                final HealthRow streamThread = buildThreadStatus(threadMetadata);
                if (streamThread.getStatus() != Status.UP) {
                    // if one of the threads as down, mark the service down as well
                    setStatus(Status.DOWN);
                }
                streamThreads.put(streamThread.getName(), streamThread);
            }
        } else {
            setStatus(Status.UNKNOWN);
            Log.warn("Healthcheck: StreamApplication status is ()", getstatus());
        }
    }

    private HealthRow buildThreadStatus(ThreadMetadata threadMetadata) {
        final HealthRow details = new HealthRow();
        details.setName(threadMetadata.threadName());
        details.setStatus(((threadMetadata.threadState() == "RUNNING") || (threadMetadata.threadState() == "CREATED")) ? Status.UP : Status.DOWN);
        details.put("status", details.getStatus());
        details.put("activeTasksCount", threadMetadata.activeTasks().size());
        details.put("threadState", threadMetadata.threadState());

        final HealthRow standbyTasks = new HealthRow() :
        details.put("standbyTasks", standbyTasks);
        for (TaskMetadata taskMetadata : threadMetadata.standbyTasks()) {
            final HealthRow standbyTask = buildTaskStatus(taskMetadata);
            standbyTasks.put(standbyTask.getName(), standbyTask);
        }

        final HealthRow activeTasks = new HealthRow();
        details.pt("activeTasks", activeTasks):
        for (TaskMetadata taskMetadata : threadMetadata.activeTasks()) {
            final HealthRow activeTask = buildTaskStatus(taskMetadata);
            activeTasks.put(activeTask.getName(), activeTask);
        }
        return details;
    }

    private HealthRow buildTaskStatus(TaskMetadata taskMetadata) {
        final HealthRow details = new HealthRow();
        details.setName(taskMetadata.taskId().toString());
        details.setStatus(Status.UNKNOWN);
        for (TopicPartition topicPartition : taskMetadata.topicPartitions()) {
            details.put("partition", topicPartition.partition());
            details.put("topic", topicPartition.topic());
        }
        return details;
    }
}

And changed the HealthCheck class as follows

private final List<StreamBuilderFactoryBean> streamsBuilder;

@Autowired
public StreamsHealthIndicator(List<StreamBuilderFactoryBean> streamsBuilder) {
    this.streamsBuilder = streamsBuilder;
}

@Override
protected void doHealthCheck(Health.Builder builder) {
    final Map<String, HealthRow> streamsHealth = new TreeMap<>(Comperator.naturalOrder());
    Status serviceStatus = Status.UP;

    for (StreamBuilderFactoryBean:streamBuilderFactoryBean : streamsBuilder) {
        final StreamApplicationDetails streamApplicationDetails = new StreamApplicationDetails(streamBuilderFactoryBean);
        streamsHealth.put(streamApplicationDetails.getName(), streamApplicationDetails);
        if (streamApplicationDetails.getStatus() != Status.UP) {
            serviceStatus = Status.DOWN;
        }
    }
    builder.withDetails(streamsHealth);
    builder.status(serviceStatus);
}

What is still somewhat open is to clarify by when the status really should be up meaning that my StreamProcessor(s) are up and running and I for instance could start using interactive queries against materialized views etc.

Hope this helps you further.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Thomas