'Spring Batch ChunkRequest throws stackOverflow
I am struggling with Spring Batch RemoteChunking with newer versions of Spring boot. First i try to send ChunkRequest as event, it wasn't possible because it has no public default constructor. As solution I had to create a custom serializer and Deserializer and I set them in my application.properties correctly. After doing all of that, now i am having Stackoverflow. (i understand the reason just by checking the ChunkRequest class which Has Collection of stepExecutions and the step execution have jobExecution which have stepExecutions)
What i am missing, even after serializing ChunkRequest as byte[]?
2022-05-14 00:22:31.880 INFO 11974 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.3ac5789f-01ba-493f-a7ce-6f6f4429a0a1-2, groupId=anonymous.3ac5789f-01ba-493f-a7ce-6f6f4429a0a1] (Re-)joining group
2022-05-14 00:22:31.891 INFO 11974 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=client-job]] launched with the following parameters: [{run.id=1}]
2022-05-14 00:22:31.901 INFO 11974 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [dispatchStep]
2022-05-14 00:22:31.948 ERROR 11974 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step dispatchStep in job client-job
org.springframework.retry.ExhaustedRetryException: Retry exhausted after last attempt in recovery path, but exception is not skippable.; nested exception is org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'clientRequests'; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Infinite recursion (StackOverflowError) (through reference chain: org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.StepExecution["jobExecution"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion (StackOverflowError) (through reference chain: org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util
.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]), failedMessage=GenericMessage [payload=ChunkRequest: jobId=1, sequence=0, contribution=[StepContribution: read=0, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], item count=1, headers={id=ae9dbd76-ed32-0410-dacd-b6c700de87b2, contentType=application/json, timestamp=1652484151911}]
at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$5.recover(FaultTolerantChunkProcessor.java:429) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539) ~[spring-retry-1.3.3.jar:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387) ~[spring-retry-1.3.3.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255) ~[spring-retry-1.3.3.jar:na]
at org.springframework.batch.core.step.item.BatchRetryTemplate.execute(BatchRetryTemplate.java:217) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.write(FaultTolerantChunkProcessor.java:444) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:217) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.3.19.jar:5.3.19]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.3.5.jar:4.3.5]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.3.5.jar:4.3.5]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.3.5.jar:4.3.5]
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:152) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:413) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:320) ~[spring-batch-core-4.3.5.jar:4.3.5]
org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.executeLocalJobs(JobLauncherApplicationRunner.java:173) ~[spring-boot-autoconfigure-2.6.7.jar:2.6.7]
at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.launchJobFromProperties(JobLauncherApplicationRunner.java:160) ~[spring-boot-autoconfigure-2.6.7.jar:2.6.7]
at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:155) ~[spring-boot-autoconfigure-2.6.7.jar:2.6.7]
at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:150) ~[spring-boot-autoconfigure-2.6.7.jar:2.6.7]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:768) ~[spring-boot-2.6.7.jar:2.6.7]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:758) ~[spring-boot-2.6.7.jar:2.6.7]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:310) ~[spring-boot-2.6.7.jar:2.6.7]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312) ~[spring-boot-2.6.7.jar:2.6.7]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) ~[spring-boot-2.6.7.jar:2.6.7]
at pt.bayonne.sensei.importclients.ImportClientsApplication.main(ImportClientsApplication.java:10) ~[classes/:na]
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'clientRequests'; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Infinite recursion (StackOverflowError) (through reference chain: org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util
Any Idea? Any example of the sample of application using RemoteChunking? Thanks
After adding this:https://github.com/spring-projects/spring-batch/issues/1488
I got a new exception:
Caused by: java.io.StreamCorruptedException: invalid stream header: 7B0A2020
at java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:963) ~[na:na]
at java.base/java.io.ObjectInputStream.<init>(ObjectInputStream.java:397) ~[na:na]
at org.springframework.util.SerializationUtils.deserialize(SerializationUtils.java:81) ~[spring-core-5.3.19.jar:5.3.19]
... 27 common frames omitted
Solution 1:[1]
i understand the reason just by checking the ChunkRequest class which Has Collection of stepExecutions and the step execution have jobExecution which have stepExecutions
You nailed it, and in the end you are hitting https://github.com/spring-projects/spring-batch/issues/1488. This is a Jackson issue, because other serialization mechanisms are able to serialize/deserialize ChunkRequest
objects.
Solution 2:[2]
As @Mahmoud Ben Hassine mentioned,
This is a Jackson issue because other serialization mechanisms are able to serialize/deserialize ChunkRequest objects.
Once you create your custom serializer, for example:
package pt.bayonne.sensei.RemoteChunking.job.serde;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.integration.chunk.ChunkRequest;
import org.springframework.util.SerializationUtils;
import pt.bayonne.sensei.RemoteChunking.dto.ClientDTO;
public class ChunkRequestSerializer implements Serializer<ChunkRequest>{
private static final Logger logger = LoggerFactory.getLogger(ChunkRequestSerializer.class);
@Override
public byte[] serialize(String topic, ChunkRequest data) {
if (data == null) {
return null;
}
String dataType = data.getClass().getName();
logger.debug("--> serializing: {}",dataType);
byte[] dataBytes = null;
try {
dataBytes = SerializationUtils.serialize(data);
} catch (Exception e) {
logger.error("Error serializing data", e);
}
return dataBytes;
}
}
You have to specify that it is using the native encoding.
#produces
spring.cloud.stream.bindings.clientRequests.destination=clientRequests
spring.cloud.stream.bindings.clientRequests.producer.use-native-encoding=true
spring.kafka.producer.value-serializer=pt.bayonne.sensei.RemoteChunking.job.serde.ChunkRequestSerializer
Otherwise you will have more exceptions and sometimes it will continue picking the default framework serializer/deserializer. Using the Mixins doesn't help that much
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Mahmoud Ben Hassine |
Solution 2 | Pascoal Eddy Bayonne |