'Spring integration and batch job is running in every poll

I don't understand spring integration with JobLaunchingGateway behavior. I have example config:

    public SftpInboundChannelAdapterSpec sftpInboundChannelAdapterSpec() {
        return Sftp.inboundAdapter(ftpFileSessionFactory())
                .preserveTimestamp(true)
                .deleteRemoteFiles(false)
                .remoteDirectory(integrationProperties.getRemoteDirectory())
                .filter(sftpFileListFilter())
                .localDirectory(new File(integrationProperties.getLocalDirectory()));
    }

    public PollerSpec pollerSpec() {
        PollerSpec cron = Pollers.cron(integrationProperties.getPollerCron());
        cron.maxMessagesPerPoll(integrationProperties.getMessagePerPoll());

        return cron;
    }

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlows.from(sftpInboundChannelAdapterSpec(), pc -> pc.poller(pollerSpec()))
                .transform(fileMessageToJobRequest())
                .handle(jobLaunchingGateway())
                .handle(message -> {
                    logger.info("Handle message: {}", message.getPayload());
                })
                .get();
    }

    @Bean
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());

        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
        return jobLaunchingGateway;
    }

    private ChainFileListFilter<ChannelSftp.LsEntry> sftpFileListFilter() {
        ChainFileListFilter<ChannelSftp.LsEntry> chainFileListFilter = new ChainFileListFilter<>();
        chainFileListFilter.addFilter(new SftpSimplePatternFileListFilter("*.xlsx"));
        chainFileListFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(metadataStore(), "INT"));
        return chainFileListFilter;
    }

If I set polling every 1 minute, the job will be created every minute. I don't see any new record in MetaDataStore.

When I comment line with .handle(jobLaunchingGateway())

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlows.from(sftpInboundChannelAdapterSpec(), pc -> pc.poller(pollerSpec()))
                .transform(fileMessageToJobRequest())
//                .handle(jobLaunchingGateway())
                .handle(message -> {
                    logger.info("Handle message: {}", message.getPayload());
                })
                .get();
    }

Everything works as expected.

I expected that SFTP fetch new file(s) and then create new job for each file.

I don't understand why I don't see records in MetaDataStore when I JobLaunchingGateway is enabled.

Can you help me and explain this?

I am new in spring integration and batch?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source