'Error handling in multithreaded Spring Batch

I have developed a Spring Batch application that works fine with single thread. Its a simple batch application that reads a csv file using FlatFileItemReader outputs a POJO CSVLineMapper, does simple processing and then write the POJO to a repository.

Now I make the application multithreaded using ThreadPoolTaskExecutor. To test the error handling by the framework, I throw RuntimeException for a specific record in the processor expecting only hte specific thread to get terminated and skipping only that chunk in which error was thrown. But the application terminates after the error writing only 15 records. why? Am I doing something wrong?

As restartability is not supported with multithreading, how do we design a multi-threaded spring batch application such that only problematic record is skipped and the application continues processing hte other records without terminating.

Please find the code snippet used below :

public Step load(){
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(5);
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.afterPropertiesSet();
stepBuilderFactory.get("load")
        .chunk(5)
        .reader(reader1())
        .processor(processor())
        .writer(writer())
        .taskExecutor(threadPoolTaskExecutor)
        .listener(stepExecutionListener)
        .listener(processListener())
        .listener(writeListener)
        .build()
}

reader1 is FlatFileItemReader that has setSaveState to false.

One more observation is that the log in the reader is called only once in the complete flow which is called my the main thread. But the processor and writer are called by diff threads of ThreadPoolTaskExecutor. Why? The reader doesnot implement ItemReader, but processor and writer implement ItemProcessor and ItemWriter respectively in my case.



Solution 1:[1]

I wrote a mimimal poc to reproduce the problem. What is happening is , the application terminates after 15-16 records. It never writes the rest of the records in csv. Person and Person1 are pojos with firstname,lastname,age as fields.

Person.csv has columns firstname,lastname,age. Issue comes when more than 20 records. Have one record, lets say the 8th record with firstname "zxcv" (problematic record).

The sysout in the reader prints the current thread as main thread. But the reader should be part of a thread in ThreadPoolExecutor? Why the application is terminating without completing all the records?

Please find code below:

@SpringBootApplication
@EnableBatchProcessing
public class DemoApplication implements ApplicationRunner {

    private JobLauncher jobLauncher;

    private static ConfigurableApplicationContext ctx;
    static DemoApplication _this;

    public static void main(String...args) {
            try {
                ctx = SpringApplication.run(DemoApplication.class, args);
                _this.start(args);
                System.exit(0);
            }catch(Exception e){
                System.exit(-1);
            }
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        _this=this;
    }

    public void start(String...args) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("user", args[0])
                .addString("repository", args[1])
                .toJobParameters();
        this.jobLauncher.run((Job) ctx.getBean("uploadJob"),jobParameters);
    }
}




@Configuration
public class BulkConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    @StepScope
    public FlatFileItemReader<Person> reader1(){
        FlatFileItemReader<Person>  reader = new FlatFileItemReader<>();
        reader.setLinesToSkip(1);
        reader.setResource(new PathResource("C:\\sush\\setups\\demo\\src\\main\\resources\\person.csv"));

        DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();

        String[] names = {"firstname", "lastname", "age"};
        tokenizer.setNames(names);

        BeanWrapperFieldSetMapper<Person>  fieldSetMapper = new BeanWrapperFieldSetMapper<>();
        fieldSetMapper.setTargetType(Person.class);

        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(fieldSetMapper);
        lineMapper.afterPropertiesSet();

        reader.setLineMapper(lineMapper);
        reader.setSaveState(false);
        System.out.println("Thread running in reader "+ Thread.currentThread().getName());
        return reader;

    }

    @Bean
    @StepScope
    public ItemProcessor<Person,Person1> processor1(){
        return new EnrichmentProcessor();
    }

    @Bean
    @StepScope
    public ItemWriter<Person1> writer(){
        return new FileUploader();
    }


    @Bean
    public Step load(){
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setMaxPoolSize(3);
        threadPoolTaskExecutor.setCorePoolSize(3);
        threadPoolTaskExecutor.afterPropertiesSet();

        return this.stepBuilderFactory.get("load")
                .<Person,Person1>chunk(2)
                .reader(reader1())
                .processor(processor1())
                .writer(writer())
                .taskExecutor(threadPoolTaskExecutor)
                .build();
    }


    @Bean(name = "uploadJob")
    public Job cmUploadJob(){
        return this.jobBuilderFactory.get("uploadJob")
                .start(load())
                .build();
    }
}



public class EnrichmentProcessor implements ItemProcessor<Person, Person1> {
    @Override
    public Person1 process(Person person) throws Exception {
        Person1 person1 = new Person1();
        person1.setFirstname(person.getFirstname().toUpperCase());
        person1.setLastname(person.getLastname().toUpperCase());
        person1.setAge(person.getAge());

        if(person.getFirstname().equals("zxcv")){
            throw new RuntimeException("Error occurred in thread " + Thread.currentThread().getName());
        }

        return person1;
    }
}


public class FileUploader implements ItemWriter<Person1> {
    @Override
    public void write(List<? extends Person1> list) throws Exception {
        for(Person1 p : list){
            System.out.println("Writing person "+ p.getFirstname() + Thread.currentThread().getName());
        }
    }
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Samar