'Sleuth 3.0.3 doesn't work with Kafka in kafka logs

Iam new to sleuth.

I added the sleuth dependecny as follows.

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>

and made the corresponding changes in log pattern

<PatternLayout pattern="%d{HH:mm:ss.SSS} [%tid] [%t] [%X{traceId},%X{spanId}] %-5level %logger{36} - %msg%n"/>

for normal logs I get the traceId and spanId as below

18:54:28.876 [42] [http-nio-8082-exec-2] [04287c5af43cc1d2,04287c5af43cc1d2] INFO  - ajax event received

when the data is send to kafka it is shown as below.

18:54:30.763 [72] [kafka-producer-network-thread | producer-1] [,] INFO  - event successfully send to kafka topic at time 1645709070763

The kafka producer is as shown below.

@Component
@AllArgsConstructor
public class KafkaProducer {

    private static final Logger log = LogManager.getLogger(KafkaProducer.class);

    private final ProducerConfiguration producerConfiguration;

    public Future<RecordMetadata> produceEvents(GenericRecord genericRecord, String topic, String sessionId) {

        Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfiguration.getProducerProperties());

        ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>(
                topic, sessionId, genericRecord
        );

        Future<RecordMetadata> data = producer.send(producerRecord,
                (RecordMetadata metadata, Exception exception) ->{
                if (exception == null) {
                    log.info("event successfully send to kafka topic at time {}", System.currentTimeMillis());
                } else {
                    log.info(exception.getMessage());
                }
            }
        );

        producer.flush();
        producer.close();
        return data;
    }
}

The producer configuration.

@Component
@Getter
@Setter
public class ProducerConfiguration {

    private final Properties producerProperties;

    @Value("${kafka.bootstrapserver}")
    private String bootstrapServer;
    @Value("${kafka.acks}")
    private String acks;
    @Value("${kafka.retries}")
    private String retries;
    @Value("${kafka.max.in.flight.requests.per.connection}")
    private String maxInFlightRequestsPerConnection;
    @Value("${kafka.enable.idempotence}")
    private String enableIdempotence;

    public ProducerConfiguration(Properties producerProperties){
        this.producerProperties = producerProperties;
    }

    @Bean
    private void setProperties(){
        this.producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        
        this.producerProperties.setProperty(ProducerConfig.ACKS_CONFIG, acks);
        this.producerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, retries);
        this.producerProperties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsPerConnection);
        this.producerProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
        this.producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer .class.getName());
        this.producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer .class.getName());
        this.producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList(TracingProducerInterceptor.class));

    }
}

Iam not able to figure out why this is happening.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source