'Prometheus alert not firing for Kafka consumer group lag?
I'm trying to write a simplified example of instrumenting alerting on excessive Kafka consumer group lag. The Docker Compose multi-container application to demonstrate this is at https://github.com/khpeek/kafka-exporter-example and has the following structure:
.
├── README.md
├── docker-compose.yml
├── kafka-consumer
│ ├── Dockerfile
│ ├── go.mod
│ ├── go.sum
│ └── main.go
├── kafka-exporter
│ ├── Dockerfile
│ └── run.sh
├── kafka-producer
│ ├── Dockerfile
│ ├── LICENSE
│ ├── cmd
│ │ ├── produce.go
│ │ └── root.go
│ ├── go.mod
│ ├── go.sum
│ └── main.go
└── prometheus
├── alerts.rules.yml
└── prometheus.yml
where the docker-compose.yaml is
version: '2'
networks:
app-tier:
driver: bridge
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
environment:
- 'ALLOW_ANONYMOUS_LOGIN=yes'
networks:
- app-tier
kafka:
image: 'bitnami/kafka:latest'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
networks:
- app-tier
kafka-exporter:
build: kafka-exporter
ports:
- "9308:9308"
networks:
- app-tier
entrypoint: ["run.sh"]
consumer:
build: kafka-consumer
networks:
- app-tier
producer:
build: kafka-producer
networks:
- app-tier
prometheus:
image: bitnami/prometheus:latest
ports:
- "9090:9090"
volumes:
- "./prometheus/prometheus.yml:/opt/bitnami/prometheus/conf/prometheus.yml"
- "./prometheus/alerts.rules.yml:/alerts.rules.yml"
networks:
- app-tier
grafana:
image: grafana/grafana
ports:
- "3000:3000"
networks:
- app-tier
The prometheus.yaml is
global:
scrape_interval: 10s
scrape_timeout: 10s
evaluation_interval: 1m
scrape_configs:
- job_name: kafka-exporter
metrics_path: /metrics
honor_labels: false
honor_timestamps: true
sample_limit: 0
static_configs:
- targets: ['kafka-exporter:9308']
rule_files:
- "/alerts.rules.yml"
and the alerting rules are defined in alerts.rules.yaml as
groups:
- name: alerts
rules:
- alert: excessive_consumer_group_lag
expr: kafka_consumergroup_lag_sum{topic="example"} > 10
The Kafka consumer, by default, sets up a consumer group to listen to the topic example:
package main
import (
"context"
"flag"
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/Shopify/sarama"
)
// Sarama configuration options
var (
brokers string
group string
topics string
oldest bool
delay time.Duration
)
func init() {
flag.StringVar(&brokers, "brokers", "kafka:9092", "Kafka bootstrap brokers to connect to, as a comma separated list")
flag.StringVar(&group, "group", "my-consumer-group", "Kafka consumer group definition")
flag.StringVar(&topics, "topics", "example", "Kafka topics to be consumed, as a comma separated list")
flag.DurationVar(&delay, "delay", 5*time.Second, "Delay with which to respond to message (to simulate latency which would lead to consumer group lag)")
flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest")
flag.Parse()
if len(brokers) == 0 {
panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
}
if len(topics) == 0 {
panic("no topics given to be consumed, please set the -topics flag")
}
if len(group) == 0 {
panic("no Kafka consumer group defined, please set the -group flag")
}
}
func main() {
log.Println("Starting a new Sarama consumer")
config := sarama.NewConfig()
if oldest {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
/**
* Setup a new Sarama consumer group
*/
consumer := Consumer{
ready: make(chan bool),
}
ctx, cancel := context.WithCancel(context.Background())
// Use a (constant-time) retry mechanism to instantiate a consumer group as
// Kafka might not be ready yet
var client sarama.ConsumerGroup
var err error
for i := 0; i < 10; i++ {
client, err = sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
if err != nil {
log.Printf("Error creating consumer group client: %v. Retrying...", err)
time.Sleep(time.Second)
continue
}
}
if err != nil {
log.Fatalf("Error creating consumer group client: %v.", err)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
log.Panicf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
<-consumer.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
}
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
time.Sleep(delay)
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
}
return nil
}
The kafka-producer is a Cobra command-line application with a blocking root command which is mean to be exec'd into. It's main.go reads
package main
import "github.pie.apple.com/kurt-peek/kafka-exporter-example/kafka-test/cmd"
func main() {
cmd.Execute()
}
where cmd/root.go reads
/*
Copyright © 2022 NAME HERE <EMAIL ADDRESS>
*/
package cmd
import (
"log"
"os"
"time"
"github.com/Shopify/sarama"
"github.com/spf13/cobra"
)
// Sarama configuration options
var (
brokers []string
producer sarama.SyncProducer
)
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: "kafka-producer",
Short: "Send Kafka messages",
Long: `Set up a Kafka SyncProducer to send messages.`,
PersistentPreRun: func(cmd *cobra.Command, args []string) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
var err error
for i := 1; i <= 10; i++ {
producer, err = sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Printf("Instantiate sync producer at broker addresses %v: %v. Attempting retry #%d...", brokers, err, i)
}
time.Sleep(time.Second)
}
if err != nil {
log.Fatalf("Instantiate sync producer at broker addresses %v: %v", brokers, err)
}
log.Printf("Connected to Kafka brokers at addresses %v", brokers)
},
Run: func(cmd *cobra.Command, args []string) {
select {} // A blocking command to prevent the program from running to completion, so that it can be exec'd into to run commands
},
}
// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
err := rootCmd.Execute()
if err != nil {
os.Exit(1)
}
}
func init() {
rootCmd.Flags().StringSliceVar(&brokers, "brokers", []string{"kafka:9092"}, "Addresses of Kafka brokers to produce messages to")
}
and produce.go reads
/*
Copyright © 2022 NAME HERE <EMAIL ADDRESS>
*/
package cmd
import (
"log"
"github.com/Shopify/sarama"
"github.com/spf13/cobra"
)
var (
count int
topic string
message string
)
// produceCmd represents the produce command
var produceCmd = &cobra.Command{
Use: "produce",
Short: "Produce a given number of Kafka messages",
Long: `Produce a given number of Kafka messages`,
PreRun: func(cmd *cobra.Command, args []string) {
if topic == "" {
log.Fatal("Topic must not be empty")
}
},
Run: func(cmd *cobra.Command, args []string) {
for i := 1; i <= count; i++ {
partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder("Hello, world!"),
})
if err != nil {
log.Fatalf("Failed to send message: %v", err)
}
log.Printf("Sent message %d/%d to partition %d at offset %d", i, count, partition, offset)
}
},
}
func init() {
rootCmd.AddCommand(produceCmd)
produceCmd.Flags().IntVar(&count, "count", 1, "Number of messages to produce")
produceCmd.Flags().StringVar(&topic, "topic", "", "Topic to produce to")
produceCmd.Flags().StringVar(&message, "message", "Hello, world!", "Message to include in the produced Kafka message")
}
After bringing up the application using docker-compose build && docker-compose up, I produced 20 messages to the example topic by execing into the kafka-producer container:
> docker exec -it kafka-exporter-example-producer-1 /bin/ash
~ # ./app produce --count=20 --message=foobar --topic="example"
2022/04/10 21:49:05 Connected to Kafka brokers at addresses [kafka:9092]
2022/04/10 21:49:05 Sent message 1/20 to partition 0 at offset 0
2022/04/10 21:49:05 Sent message 2/20 to partition 0 at offset 1
2022/04/10 21:49:05 Sent message 3/20 to partition 0 at offset 2
2022/04/10 21:49:05 Sent message 4/20 to partition 0 at offset 3
2022/04/10 21:49:05 Sent message 5/20 to partition 0 at offset 4
2022/04/10 21:49:05 Sent message 6/20 to partition 0 at offset 5
2022/04/10 21:49:05 Sent message 7/20 to partition 0 at offset 6
2022/04/10 21:49:05 Sent message 8/20 to partition 0 at offset 7
2022/04/10 21:49:05 Sent message 9/20 to partition 0 at offset 8
2022/04/10 21:49:05 Sent message 10/20 to partition 0 at offset 9
2022/04/10 21:49:05 Sent message 11/20 to partition 0 at offset 10
2022/04/10 21:49:05 Sent message 12/20 to partition 0 at offset 11
2022/04/10 21:49:05 Sent message 13/20 to partition 0 at offset 12
2022/04/10 21:49:05 Sent message 14/20 to partition 0 at offset 13
2022/04/10 21:49:05 Sent message 15/20 to partition 0 at offset 14
2022/04/10 21:49:05 Sent message 16/20 to partition 0 at offset 15
2022/04/10 21:49:05 Sent message 17/20 to partition 0 at offset 16
2022/04/10 21:49:05 Sent message 18/20 to partition 0 at offset 17
2022/04/10 21:49:05 Sent message 19/20 to partition 0 at offset 18
2022/04/10 21:49:05 Sent message 20/20 to partition 0 at offset 19
~ #
If I open up http://localhost:3000, log in the the default username and password (both "admin"), import a Prometheus data source with URL http://prometheus:9090, and look at the metrics for kafka_consumergroup_lag_sum, I can see that the alerting threshold of 10 was temporarily exceeded:
Yet if I navigate to http://localhost:9090 to look at the Alertmanager UI, I don't see any alerts having fired:
Any idea why the alert is not firing? The metrics show up a PromQL query as having exceeded the threshold of 10 for several seconds.
Solution 1:[1]
To convert Oleg Mayko's comment into an answer, indeed after reducing the evaluation_interval from 1m to 10s I see the alert:
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Kurt Peek |



