'How to perform logging in Flink jobs?

I am working on a very simple use case where i want to check the data in a DataStream. I would like to understand if there is a better way of logging.Because the below way of logging looks very ugly and adds an extra stage.

DataStream<Conversation> stream = env.addSource(kafkaConsumer.getKafkaConsumer());
        stream.map(conversation -> {
            logger.info("Read from Kafka source conversationId: {} and content: {}",conversation.id,conversation.time);
            return conversation;
        });


Solution 1:[1]

Maybe you can implement a mapfunction class, and then print the log in the class

DataStream<Conversation> stream = env.addSource(kafkaConsumer.getKafkaConsumer());
            stream.map(new MyMapFunction()
            });

public class MyMapFunction extends RichMapFunction<T> {
    @Override
    public void open(Configuration parameters) throws Exception {
    }

    @Override
    public T map(...) throws Exception {
        logger.info(xxxx);
        return xxx;
    }
}

Also you can use printsink directly

DataStream<Conversation> stream = env.addSource(kafkaConsumer.getKafkaConsumer());
        stream.print()

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 ChangLi