'Need to process multiple files in parallel in Spring Integration

I have a SFTP directory and reading files and sending the files for further processing to a ServiceActivator.At any point I need to process them parallely using the handler.

Here is my SPring Integration java DSL flow.

IntegrationFlows.from(Sftp.inboundAdapter(getSftpSessionFactory())
                        .temporaryFileSuffix("COPY")
                        .localDirectory(directory)
                        .deleteRemoteFiles(false)
                        .preserveTimestamp(true)
                        .remoteDirectory("remoteDir"))
                        .patternFilter("*.txt")), e -> e.poller(Pollers.fixedDelay(500).maxMessagesPerPoll(5)))
                        .handle("mybean", "myMethod")
                        .handle(Files.outboundAdapter(new File("success")))         
                        .deleteSourceFiles(true)
                        .autoCreateDirectory(true))
                        .get();

Update:Here is my ThreadPoolExecutor:

@Bean(name = "executor")
public Executor getExecutor()
{
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(4);
    executor.setQueueCapacity(20);          
    executor.initialize();
    return executor;
}


Solution 1:[1]

The Sftp.inboundAdapter() (SftpInboundFileSynchronizingMessageSource) returns remote files one by one anyway. First of all it synchronizes them to the local directory and only after that poll them for message processing as File payload.

To process them in parallel that would be just enough to add a taskExecutor to your e.poller() definition and all those maxMessagesPerPoll(5) will be distributed to different threads.

Solution 2:[2]

For my use case, assigning the taskExecutor to the channel did the trick. Here is the code I used for a really simple application that polls a folder and processes the files. Files are handled in parallel, even if I assign only one thread to the poller.

@Slf4j
@SpringBootApplication
public class SpringIntegrationDemoApplication {

  public static void main( String[] args ) {
    SpringApplication.run( SpringIntegrationDemoApplication.class, args );
  }

  @Bean
  public IntegrationFlow fileReadingFlow() {
    return IntegrationFlows
        .from( Files.inboundAdapter( new File( "/tmp/in" ) ).patternFilter( "*.txt" ),
               e ->
                   e.poller( Pollers.fixedDelay( 10000 )
//                               .taskExecutor( Executors.newFixedThreadPool( 1 ) )
                                 .maxMessagesPerPoll( 100 )
                   ) )
        .channel( MessageChannels.executor( Executors.newFixedThreadPool( 100 ) ) )
        .handle( "myFileHandler", "handleMessage" )
        .get();
  }

  @Bean
  public MessageHandler myFileHandler() {
    return message -> {
      try {
        log.info( "START" + message.getPayload() );
        Thread.sleep( 2000 );
        log.info( "STOP" + message.getPayload() );
      }
      catch ( InterruptedException e ) {
        e.printStackTrace();
      }
    };
  }
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Artem Bilan
Solution 2 Hans