'Spring integration and batch job is running in every poll
I don't understand spring integration with JobLaunchingGateway behavior. I have example config:
public SftpInboundChannelAdapterSpec sftpInboundChannelAdapterSpec() {
return Sftp.inboundAdapter(ftpFileSessionFactory())
.preserveTimestamp(true)
.deleteRemoteFiles(false)
.remoteDirectory(integrationProperties.getRemoteDirectory())
.filter(sftpFileListFilter())
.localDirectory(new File(integrationProperties.getLocalDirectory()));
}
public PollerSpec pollerSpec() {
PollerSpec cron = Pollers.cron(integrationProperties.getPollerCron());
cron.maxMessagesPerPoll(integrationProperties.getMessagePerPoll());
return cron;
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlows.from(sftpInboundChannelAdapterSpec(), pc -> pc.poller(pollerSpec()))
.transform(fileMessageToJobRequest())
.handle(jobLaunchingGateway())
.handle(message -> {
logger.info("Handle message: {}", message.getPayload());
})
.get();
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
return jobLaunchingGateway;
}
private ChainFileListFilter<ChannelSftp.LsEntry> sftpFileListFilter() {
ChainFileListFilter<ChannelSftp.LsEntry> chainFileListFilter = new ChainFileListFilter<>();
chainFileListFilter.addFilter(new SftpSimplePatternFileListFilter("*.xlsx"));
chainFileListFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(metadataStore(), "INT"));
return chainFileListFilter;
}
If I set polling every 1 minute, the job will be created every minute. I don't see any new record in MetaDataStore.
When I comment line with .handle(jobLaunchingGateway())
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlows.from(sftpInboundChannelAdapterSpec(), pc -> pc.poller(pollerSpec()))
.transform(fileMessageToJobRequest())
// .handle(jobLaunchingGateway())
.handle(message -> {
logger.info("Handle message: {}", message.getPayload());
})
.get();
}
Everything works as expected.
I expected that SFTP fetch new file(s) and then create new job for each file.
I don't understand why I don't see records in MetaDataStore when I JobLaunchingGateway is enabled.
Can you help me and explain this?
I am new in spring integration and batch?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
