'How to perform logging in Flink jobs?
I am working on a very simple use case where i want to check the data in a DataStream. I would like to understand if there is a better way of logging.Because the below way of logging looks very ugly and adds an extra stage.
DataStream<Conversation> stream = env.addSource(kafkaConsumer.getKafkaConsumer());
stream.map(conversation -> {
logger.info("Read from Kafka source conversationId: {} and content: {}",conversation.id,conversation.time);
return conversation;
});
Solution 1:[1]
Maybe you can implement a mapfunction class, and then print the log in the class
DataStream<Conversation> stream = env.addSource(kafkaConsumer.getKafkaConsumer());
stream.map(new MyMapFunction()
});
public class MyMapFunction extends RichMapFunction<T> {
@Override
public void open(Configuration parameters) throws Exception {
}
@Override
public T map(...) throws Exception {
logger.info(xxxx);
return xxx;
}
}
Also you can use printsink directly
DataStream<Conversation> stream = env.addSource(kafkaConsumer.getKafkaConsumer());
stream.print()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | ChangLi |
