'ElasticsearchItemReader keeps reading same records

I am really beginner in Spring and I have to develop an application using spring-batch. This application must read from a elasticsearch index and write all the records in a File.

When I run the program, I don't get any error, and the application reads the records and write them in the file correctly. The thing is the application never stops and keep reading, processing and writing the data without ending. In the following picture, you can see same records being processing many times.

enter image description here

I think must be some problem in my code or my design of the software, so I attach the most important parts of my code hereunder.

I developed the following ElasticsearchItemReader:

public class ElasticsearchItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {

private final Logger logger;

private final ElasticsearchOperations elasticsearchOperations;

private final SearchQuery query;

private final Class<? extends T> targetType;

public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class<? extends T> targetType) {
    setName(getShortName(getClass()));
    logger = getLogger(getClass());
    this.elasticsearchOperations = elasticsearchOperations;
    this.query = query;
    this.targetType = targetType;
}

@Override
public void afterPropertiesSet() throws Exception {
    state(elasticsearchOperations != null, "An ElasticsearchOperations implementation is required.");
    state(query != null, "A query is required.");
    state(targetType != null, "A target type to convert the input into is required.");
}

@Override
@SuppressWarnings("unchecked")
protected Iterator<T> doPageRead() {

    logger.debug("executing query {}", query.getQuery());

    return (Iterator<T>)elasticsearchOperations.queryForList(query, targetType).iterator();
}
}

Also I wrote the following ReadWriterConfig:

@Configuration
public class ReadWriterConfig {

@Bean
public ElasticsearchItemReader<AnotherElement> elasticsearchItemReader() {

    return new ElasticsearchItemReader<>(elasticsearchOperations(), query(), AnotherElement.class);
}


@Bean
public SearchQuery query() {

    NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder()
            .withQuery(matchAllQuery());

    return builder.build();
}

@Bean
public ElasticsearchOperations elasticsearchOperations()  {

    Client client = null;
    try {
        Settings settings = Settings.builder()
                .build();

        client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
        return new ElasticsearchTemplate(client);
    } catch (UnknownHostException e) {
        e.printStackTrace();
        return null;
    }


}
}

And I wrote the batchconfiguration where I call the reader, writer and processor:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

// tag::readerwriterprocessor[]
@Bean
public ElasticsearchItemReader<AnotherElement> reader() {
    return  new ReadWriterConfig().elasticsearchItemReader();
}

@Bean
public PersonItemProcessor processor() {
    return new PersonItemProcessor();
}

@Bean
public FlatFileItemWriter itemWriter() {
    return  new FlatFileItemWriterBuilder<AnotherElement>()
            .name("itemWriter")
            .resource(new FileSystemResource("target/output.txt"))
            .lineAggregator(new PassThroughLineAggregator<>())
            .build();
}

// end::readerwriterprocessor[]

// tag::jobstep[]
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step stepA) {
    return jobBuilderFactory.get("importUserJob")
            .flow(stepA)
            .end()
            .build();
}



@Bean
public Step stepA(FlatFileItemWriter<AnotherElement> writer) {
    return stepBuilderFactory.get("stepA")
            .<AnotherElement, AnotherElement> chunk(10)
            .reader(reader())
            .processor(processor())
            .writer(itemWriter())
            .build();
}
// end::jobstep[]

}

I attach some of the websites I was follpwimg to write this code:

https://github.com/spring-projects/spring-batch-extensions/blob/master/spring-batch-elasticsearch/README.md

https://spring.io/guides/gs/batch-processing/



Solution 1:[1]

Your reader should return an Iterator for every call of doPageRead() with which it is possible to iterate over one page of a dataset. As you are not splitting the result from the Elasticsearch query into pages but query the whole set in one step, you are returning in the first call to doPageRead() an iterator for the whole result set. Then in the next call, you return again an iterator over the very same result set.

So you have to keep track if you already returned the iterator, something like:

public class ElasticsearchItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {

    // leaving out irrelevant parts

    boolean doPageReadCalled = false;

    @Override
    @SuppressWarnings("unchecked")
    protected Iterator<T> doPageRead() {

        if(doPageReadCalled) {
            return null;
        }

        doPageReadCalled = true

        return (Iterator<T>)elasticsearchOperations.queryForList(query, targetType).iterator();
    }
}

On the first call you set the flag to true and then return the iterator, on the next call you then see that you already returned the data and return null.

This is a very basic solution, depending on the amount of data you get from Elasticsearch, it might be better to query for example with the scroll api and return pages until all are processed.

Solution 2:[2]

You need to make sure your item reader returns null at some point to signal that there is no more data to process and end the job.

As requested in comments, Here is an example of how to import the reader:

@Configuration
@org.springframework.context.annotation.Import(ReadWriterConfig.class)
@EnableBatchProcessing
public class BatchConfiguration {

   // other bean definitions

   @Bean
   public Step stepA(ElasticsearchItemReader<AnotherElement> reader, FlatFileItemWriter<AnotherElement> writer) {
      return stepBuilderFactory.get("stepA")
        .<AnotherElement, AnotherElement> chunk(10)
        .reader(reader)
        .processor(processor())
        .writer(writer)
        .build();
   }
}

Solution 3:[3]

Very late to answer this but I too faced the same issue yesterday. Not sure if the the issue is with queryForList but following worked for me.

I changed queryForList to startScroll call and subsequent continueScroll call.

    protected Iterator<T> doPageRead() {
            if(isFirstCall){ //isFirstcall is a boolean indicating if this is the first call to doPageRead

            ScrolledPage<T> scrolledPage = (ScrolledPage<T>) elasticsearchOperations.startScroll(1 * 60 * 1000, query, targetType);
            scrollId = scrolledPage.getScrollId();
            iterator = (Iterator<T>)scrolledPage.iterator();
            isFirstCall = false;

        } else{
            iterator = (Iterator<T>)elasticsearchOperations.continueScroll( scrollId, 1 * 60 * 1000, targetType).iterator();
        }
        return iterator;
}

You might need to use different scroll related methods based on the version of elasticsearchOperations.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 P.J.Meisch
Solution 2 halfer
Solution 3 Akshay Patil