'Prometheus alert not firing for Kafka consumer group lag?

I'm trying to write a simplified example of instrumenting alerting on excessive Kafka consumer group lag. The Docker Compose multi-container application to demonstrate this is at https://github.com/khpeek/kafka-exporter-example and has the following structure:

.
├── README.md
├── docker-compose.yml
├── kafka-consumer
│   ├── Dockerfile
│   ├── go.mod
│   ├── go.sum
│   └── main.go
├── kafka-exporter
│   ├── Dockerfile
│   └── run.sh
├── kafka-producer
│   ├── Dockerfile
│   ├── LICENSE
│   ├── cmd
│   │   ├── produce.go
│   │   └── root.go
│   ├── go.mod
│   ├── go.sum
│   └── main.go
└── prometheus
    ├── alerts.rules.yml
    └── prometheus.yml

where the docker-compose.yaml is

version: '2'

networks:
  app-tier:
    driver: bridge

services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    environment:
      - 'ALLOW_ANONYMOUS_LOGIN=yes'
    networks:
      - app-tier
  kafka:
    image: 'bitnami/kafka:latest'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    networks:
      - app-tier
  kafka-exporter:
    build: kafka-exporter
    ports:
      - "9308:9308"
    networks:
      - app-tier
    entrypoint: ["run.sh"]
  consumer:
    build: kafka-consumer
    networks:
      - app-tier
  producer:
    build: kafka-producer
    networks:
      - app-tier
  prometheus:
    image: bitnami/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - "./prometheus/prometheus.yml:/opt/bitnami/prometheus/conf/prometheus.yml"
      - "./prometheus/alerts.rules.yml:/alerts.rules.yml"
    networks:
      - app-tier
  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    networks:
      - app-tier

The prometheus.yaml is

global:
  scrape_interval: 10s
  scrape_timeout: 10s
  evaluation_interval: 1m

scrape_configs:
  - job_name: kafka-exporter
    metrics_path: /metrics
    honor_labels: false
    honor_timestamps: true
    sample_limit: 0
    static_configs:
      - targets: ['kafka-exporter:9308']

rule_files:
  - "/alerts.rules.yml"

and the alerting rules are defined in alerts.rules.yaml as

groups:
  - name: alerts
    rules:
      - alert: excessive_consumer_group_lag
        expr: kafka_consumergroup_lag_sum{topic="example"} > 10

The Kafka consumer, by default, sets up a consumer group to listen to the topic example:

package main

import (
    "context"
    "flag"
    "log"
    "os"
    "os/signal"
    "strings"
    "sync"
    "syscall"
    "time"

    "github.com/Shopify/sarama"
)

// Sarama configuration options
var (
    brokers string
    group   string
    topics  string
    oldest  bool
    delay   time.Duration
)

func init() {
    flag.StringVar(&brokers, "brokers", "kafka:9092", "Kafka bootstrap brokers to connect to, as a comma separated list")
    flag.StringVar(&group, "group", "my-consumer-group", "Kafka consumer group definition")
    flag.StringVar(&topics, "topics", "example", "Kafka topics to be consumed, as a comma separated list")
    flag.DurationVar(&delay, "delay", 5*time.Second, "Delay with which to respond to message (to simulate latency which would lead to consumer group lag)")
    flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest")
    flag.Parse()

    if len(brokers) == 0 {
        panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
    }

    if len(topics) == 0 {
        panic("no topics given to be consumed, please set the -topics flag")
    }

    if len(group) == 0 {
        panic("no Kafka consumer group defined, please set the -group flag")
    }
}

func main() {
    log.Println("Starting a new Sarama consumer")

    config := sarama.NewConfig()
    if oldest {
        config.Consumer.Offsets.Initial = sarama.OffsetOldest
    }

    /**
     * Setup a new Sarama consumer group
     */
    consumer := Consumer{
        ready: make(chan bool),
    }

    ctx, cancel := context.WithCancel(context.Background())

    // Use a (constant-time) retry mechanism to instantiate a consumer group as
    // Kafka might not be ready yet
    var client sarama.ConsumerGroup
    var err error
    for i := 0; i < 10; i++ {
        client, err = sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
        if err != nil {
            log.Printf("Error creating consumer group client: %v. Retrying...", err)
            time.Sleep(time.Second)
            continue
        }
    }
    if err != nil {
        log.Fatalf("Error creating consumer group client: %v.", err)
    }

    wg := &sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            // `Consume` should be called inside an infinite loop, when a
            // server-side rebalance happens, the consumer session will need to be
            // recreated to get the new claims
            if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
                log.Panicf("Error from consumer: %v", err)
            }
            // check if context was cancelled, signaling that the consumer should stop
            if ctx.Err() != nil {
                return
            }
            consumer.ready = make(chan bool)
        }
    }()

    <-consumer.ready // Await till the consumer has been set up
    log.Println("Sarama consumer up and running!...")

    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    select {
    case <-ctx.Done():
        log.Println("terminating: context cancelled")
    case <-sigterm:
        log.Println("terminating: via signal")
    }
    cancel()
    wg.Wait()
    if err = client.Close(); err != nil {
        log.Panicf("Error closing client: %v", err)
    }
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
    ready chan bool
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
    // Mark the consumer as ready
    close(consumer.ready)
    return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        time.Sleep(delay)
        log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
        session.MarkMessage(message, "")
    }

    return nil
}

The kafka-producer is a Cobra command-line application with a blocking root command which is mean to be exec'd into. It's main.go reads

package main

import "github.pie.apple.com/kurt-peek/kafka-exporter-example/kafka-test/cmd"

func main() {
    cmd.Execute()
}

where cmd/root.go reads

/*
Copyright © 2022 NAME HERE <EMAIL ADDRESS>

*/
package cmd

import (
    "log"
    "os"
    "time"

    "github.com/Shopify/sarama"
    "github.com/spf13/cobra"
)

// Sarama configuration options
var (
    brokers  []string
    producer sarama.SyncProducer
)

// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
    Use:   "kafka-producer",
    Short: "Send Kafka messages",
    Long:  `Set up a Kafka SyncProducer to send messages.`,
    PersistentPreRun: func(cmd *cobra.Command, args []string) {
        config := sarama.NewConfig()
        config.Producer.Return.Successes = true

        var err error
        for i := 1; i <= 10; i++ {
            producer, err = sarama.NewSyncProducer(brokers, config)
            if err != nil {
                log.Printf("Instantiate sync producer at broker addresses %v: %v. Attempting retry #%d...", brokers, err, i)
            }
            time.Sleep(time.Second)
        }
        if err != nil {
            log.Fatalf("Instantiate sync producer at broker addresses %v: %v", brokers, err)
        }
        log.Printf("Connected to Kafka brokers at addresses %v", brokers)
    },
    Run: func(cmd *cobra.Command, args []string) {
        select {} // A blocking command to prevent the program from running to completion, so that it can be exec'd into to run commands
    },
}

// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
    err := rootCmd.Execute()
    if err != nil {
        os.Exit(1)
    }
}

func init() {
    rootCmd.Flags().StringSliceVar(&brokers, "brokers", []string{"kafka:9092"}, "Addresses of Kafka brokers to produce messages to")
}

and produce.go reads

/*
Copyright © 2022 NAME HERE <EMAIL ADDRESS>

*/
package cmd

import (
    "log"

    "github.com/Shopify/sarama"
    "github.com/spf13/cobra"
)

var (
    count   int
    topic   string
    message string
)

// produceCmd represents the produce command
var produceCmd = &cobra.Command{
    Use:   "produce",
    Short: "Produce a given number of Kafka messages",
    Long:  `Produce a given number of Kafka messages`,
    PreRun: func(cmd *cobra.Command, args []string) {
        if topic == "" {
            log.Fatal("Topic must not be empty")
        }
    },
    Run: func(cmd *cobra.Command, args []string) {
        for i := 1; i <= count; i++ {
            partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
                Topic: topic,
                Value: sarama.StringEncoder("Hello, world!"),
            })
            if err != nil {
                log.Fatalf("Failed to send message: %v", err)
            }
            log.Printf("Sent message %d/%d to partition %d at offset %d", i, count, partition, offset)
        }
    },
}

func init() {
    rootCmd.AddCommand(produceCmd)

    produceCmd.Flags().IntVar(&count, "count", 1, "Number of messages to produce")
    produceCmd.Flags().StringVar(&topic, "topic", "", "Topic to produce to")
    produceCmd.Flags().StringVar(&message, "message", "Hello, world!", "Message to include in the produced Kafka message")
}

After bringing up the application using docker-compose build && docker-compose up, I produced 20 messages to the example topic by execing into the kafka-producer container:

> docker exec -it kafka-exporter-example-producer-1 /bin/ash
~ # ./app produce --count=20 --message=foobar --topic="example"
2022/04/10 21:49:05 Connected to Kafka brokers at addresses [kafka:9092]
2022/04/10 21:49:05 Sent message 1/20 to partition 0 at offset 0
2022/04/10 21:49:05 Sent message 2/20 to partition 0 at offset 1
2022/04/10 21:49:05 Sent message 3/20 to partition 0 at offset 2
2022/04/10 21:49:05 Sent message 4/20 to partition 0 at offset 3
2022/04/10 21:49:05 Sent message 5/20 to partition 0 at offset 4
2022/04/10 21:49:05 Sent message 6/20 to partition 0 at offset 5
2022/04/10 21:49:05 Sent message 7/20 to partition 0 at offset 6
2022/04/10 21:49:05 Sent message 8/20 to partition 0 at offset 7
2022/04/10 21:49:05 Sent message 9/20 to partition 0 at offset 8
2022/04/10 21:49:05 Sent message 10/20 to partition 0 at offset 9
2022/04/10 21:49:05 Sent message 11/20 to partition 0 at offset 10
2022/04/10 21:49:05 Sent message 12/20 to partition 0 at offset 11
2022/04/10 21:49:05 Sent message 13/20 to partition 0 at offset 12
2022/04/10 21:49:05 Sent message 14/20 to partition 0 at offset 13
2022/04/10 21:49:05 Sent message 15/20 to partition 0 at offset 14
2022/04/10 21:49:05 Sent message 16/20 to partition 0 at offset 15
2022/04/10 21:49:05 Sent message 17/20 to partition 0 at offset 16
2022/04/10 21:49:05 Sent message 18/20 to partition 0 at offset 17
2022/04/10 21:49:05 Sent message 19/20 to partition 0 at offset 18
2022/04/10 21:49:05 Sent message 20/20 to partition 0 at offset 19
~ # 

If I open up http://localhost:3000, log in the the default username and password (both "admin"), import a Prometheus data source with URL http://prometheus:9090, and look at the metrics for kafka_consumergroup_lag_sum, I can see that the alerting threshold of 10 was temporarily exceeded:

enter image description here

Yet if I navigate to http://localhost:9090 to look at the Alertmanager UI, I don't see any alerts having fired:

enter image description here

Any idea why the alert is not firing? The metrics show up a PromQL query as having exceeded the threshold of 10 for several seconds.



Solution 1:[1]

To convert Oleg Mayko's comment into an answer, indeed after reducing the evaluation_interval from 1m to 10s I see the alert:

enter image description here

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Kurt Peek