'Kafka Streams: how to avoid scheduler punctuator to be suppressed before completion

I have a Kafka stream application that needs to process some data into a StateStore every x minutes.

I have created a transformer receiving data and storing them into the StateStore. The Transformer is instantiated automatically by the kafka stream framework via a TransformerSupplier.

    TransformerSupplier<String, Message, KeyValue<String, Message>> getTransformerSupplier(){
            if(transformerSupplier == null)
                transformerSupplier = () -> new MyTransformer();
            return transformerSupplier;
    }

The transformer then Schedule a Punctuator into the context when instantiated.

this.context.schedule(Duration.ofSeconds(schedulerSeconds), PunctuationType.WALL_CLOCK_TIME, new MyScheduler);

The scheduler overrides the punctuate function containing the code needed to handle the data into the StateStore. I have added some log lines to verify that the punctuate operation is completed before the Scheduler ends.

@Override
public void punctuate(long timestamp) {
    logger.info("Scheduler started at {}", Instant.now());
    processMyDataIntoStateStore();
    logger.info("Scheduler ended at {}", Instant.now());      
}

The problem I am seeing is that when searching for the above log lines I should expect the same number for 'started' and 'ended', but I actually see that half of the executions don't reach the 'Scheduler ended'.

It seems that the punctuator operation is stopped prematurely before completed by the context, why is this happening? is there a way to avoid stopping before completion?

Another scheduler is anyway instantiated for the next execution but I would prefer that the operation is not stopped without first complete the previous.

thanks for any help



Solution 1:[1]

probably moving it on to a separate thread would do the job as the punctuator is executed on the same thread of the transformer.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Matteo