'How to make multiple writer execute parallely in spring batch

Below code is my existing code wherein it is processing and writing data into multiple collections one after another.

My requirement is i want to write into multiple collections but at the same time and not one after another .In short i want to do parallel writing process

Below is my existing code

    public Job importSingleETLData(SingleETLJobListener listener, Step singleETLStep, HttpServletRequest request) {
        return jobBuilderFactory.get("importSingleETLData")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(singleETLStep)
                .end()
                .build();
    }


@Bean
    public Step singleETLStep(MongoItemWriter<CompositeWriterData> writer, HttpServletRequest request) {
        return stepBuilderFactory.get("singleETLStep")
                // TODO: P3 chunk size configurable
                .<UserInfo, CompositeWriterData>chunk(etlConfiguration.getBatchChunkSize())
                .reader(reader(("#{jobParameters[profileId]}"))).faultTolerant().skipPolicy(readerSkipper())
                .processor(processor(request,"#{jobParameters[executeProcessing]}"))
                .listener(processorListener()).faultTolerant().skipPolicy(writerSkipper())
                .writer(writer)
                .build();
    }```


    ```@Bean
    @StepScope
    public MongoItemReader<UserInfo> reader(@Value("#{jobParameters[profileId]}") String profileId) {

        String query = "{'results._id' :'"+profileId + "'}";
        Map<String, Direction> sorts = new HashMap<>();
        sorts.put("_id", Direction.ASC);
        MongoItemReader<UserInfo> reader = new MongoItemReader<>();
        reader.setCollection(CommonConstants.USER_INFO_VIEW);
        reader.setTemplate(secondaryMongoTemplate);
        reader.setTargetType(UserInfo.class);
        // TODO: P2 take latest phi only
        // TODO: P2 Use different query in on demand to fetch last processed record if nothing is updated from when last process ran 
        reader.setQuery(query);
        reader.setSort(sorts);
        return reader;
    }

    @Bean
    @StepScope
    public ETLDataProcessor processor(HttpServletRequest request,@Value("#{jobParameters[executeProcessing]}") String executeProcessing) {
        return new ETLDataProcessor(request,executeProcessing);
    }

    @Bean
    public MongoItemWriter<ProfileRecommendationInfo> recommendationsDataWriter() {
        MongoItemWriter<ProfileRecommendationInfo> writer = new MongoItemWriter<>();
        writer.setTemplate(secondaryMongoTemplate);
        return writer;
    }

    @Bean
    public MongoItemWriter<ProfileLifebandInfo> lifeBandDataWriter() {
        MongoItemWriter<ProfileLifebandInfo> writer = new MongoItemWriter<>();
        writer.setTemplate(secondaryMongoTemplate);
        return writer;
    }

    @Bean
    public MongoItemWriter<Profile> profileWriter() {
        MongoItemWriter<Profile> writer = new MongoItemWriter<>();
        writer.setTemplate(secondaryMongoTemplate);
        return writer;
    }

    
    @Bean
    public MongoItemWriter<CompositeWriterData> compositeMongoWriter() {
        CompositeMongoItemWriter compositeWriter = new CompositeMongoItemWriter();
        compositeWriter.setTemplate(secondaryMongoTemplate);
        return compositeWriter;
    }

    @Bean
    public SingleETLProcessorListener processorListener() {
        return new SingleETLProcessorListener();
    }

    @Bean
    public SkipPolicy readerSkipper() {
        return new ReaderSkipper();
    }
    
    @Bean
    public SkipPolicy writerSkipper() {
        return new WriterSkipper();
    }
public class CompositeMongoItemWriter extends MongoItemWriter<CompositeWriterData> {

    @Autowired
    MongoItemWriter<ProfileRecommendationInfo> recommendationsDataWriter;
    @Autowired
    MongoItemWriter<ProfileLifebandInfo> lifeBandWriter;
    @Autowired
    private MongoTemplate secondaryMongoTemplate;
    @Autowired
    MongoItemWriter<Profile> profileWriter;

    @Override
    public void write(List<? extends CompositeWriterData> items) throws Exception {
        if( items!= null && !items.isEmpty()) {

            for(CompositeWriterData compositeWriterData : items) {
                for( Entry<String, Object> collection : compositeWriterData.getCollectionsPOJODataMap().entrySet() ) {

                    MongoItemWriter mongoItemWriter = fetchMongoItemWriterObject(collection.getKey());

                    if(mongoItemWriter != null) {
                        mongoItemWriter.write(Arrays.asList(collection.getValue()));
                    }

                    // Below code will update Profile with profile_recommendation_id.primary key
                    if(CommonConstants.PROFILE_RECOMMENDATION_INFO.equals(collection.getKey())) {
                        ProfileRecommendationInfo profileRecommendationInfo = (ProfileRecommendationInfo) collection.getValue();
                        updateProfileWithSavedCollectionDataId(profileRecommendationInfo.getProfileId(),profileRecommendationInfo.getDataId());
                    }

                }

            }
        }
    }

    /**
     *  This method will return an object of MongoItemWriter based on the collectionName
     * passed to it on invocation.
     * 
     * Note: - This method needs to be modified whenever new collection is added in
     * ${etl.processor.collection.pojo} in utility-service-application.properties
     * @return
     */
    private MongoItemWriter fetchMongoItemWriterObject(String collectionName){
        if(CommonConstants.PROFILE_RECOMMENDATION_INFO.equalsIgnoreCase(collectionName)) {
            return recommendationsDataWriter;
        }else if(CommonConstants.PROFILE_LIFEBAND_INFO.equalsIgnoreCase(collectionName)) {
            return lifeBandWriter;
        }
        return null;
    }

    /**
     *  Below method will update profile. with the collection data primary key value
     *          This is useful in etl processing when we fetch last data saved for a particular user
     * @throws Exception 
     */
    private void updateProfileWithSavedCollectionDataId(String profileId,String profileRecommendationInfoId) throws Exception {
        Profile profile = secondaryMongoTemplate.findById(profileId, Profile.class);
        profile.setProfileRecommendationInfoId(profileRecommendationInfoId);
        profileWriter.write(Arrays.asList(profile));
    }
}

How can i write into multiple collections but at the same time and not one after another .In short i want to do parallel writing process

We are trying to achieve what has been given as Parallel Processing in the below link

https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html



Solution 1:[1]

The CompositeItemWriter calls delegate writers in sequence. If you want to call delegate writers in parallel, you need a custom CompositeIemWriter that submits different write operations to a TaskExecutor for instance (see example here). However, you need to think about error handling and how to recover from them (ie retry/skip features).

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1