'Sleuth 3.0.3 doesn't work with Kafka in kafka logs
Iam new to sleuth.
I added the sleuth dependecny as follows.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
and made the corresponding changes in log pattern
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%tid] [%t] [%X{traceId},%X{spanId}] %-5level %logger{36} - %msg%n"/>
for normal logs I get the traceId and spanId as below
18:54:28.876 [42] [http-nio-8082-exec-2] [04287c5af43cc1d2,04287c5af43cc1d2] INFO - ajax event received
when the data is send to kafka it is shown as below.
18:54:30.763 [72] [kafka-producer-network-thread | producer-1] [,] INFO - event successfully send to kafka topic at time 1645709070763
The kafka producer is as shown below.
@Component
@AllArgsConstructor
public class KafkaProducer {
private static final Logger log = LogManager.getLogger(KafkaProducer.class);
private final ProducerConfiguration producerConfiguration;
public Future<RecordMetadata> produceEvents(GenericRecord genericRecord, String topic, String sessionId) {
Producer<String, GenericRecord> producer = new KafkaProducer<>(producerConfiguration.getProducerProperties());
ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>(
topic, sessionId, genericRecord
);
Future<RecordMetadata> data = producer.send(producerRecord,
(RecordMetadata metadata, Exception exception) ->{
if (exception == null) {
log.info("event successfully send to kafka topic at time {}", System.currentTimeMillis());
} else {
log.info(exception.getMessage());
}
}
);
producer.flush();
producer.close();
return data;
}
}
The producer configuration.
@Component
@Getter
@Setter
public class ProducerConfiguration {
private final Properties producerProperties;
@Value("${kafka.bootstrapserver}")
private String bootstrapServer;
@Value("${kafka.acks}")
private String acks;
@Value("${kafka.retries}")
private String retries;
@Value("${kafka.max.in.flight.requests.per.connection}")
private String maxInFlightRequestsPerConnection;
@Value("${kafka.enable.idempotence}")
private String enableIdempotence;
public ProducerConfiguration(Properties producerProperties){
this.producerProperties = producerProperties;
}
@Bean
private void setProperties(){
this.producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
this.producerProperties.setProperty(ProducerConfig.ACKS_CONFIG, acks);
this.producerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, retries);
this.producerProperties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsPerConnection);
this.producerProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
this.producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer .class.getName());
this.producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer .class.getName());
this.producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList(TracingProducerInterceptor.class));
}
}
Iam not able to figure out why this is happening.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
