'Avoid ConcurrentModificationException on java stream using cache

I'm getting occasionally ConcurrentModificationException with the following code:

public Set<MyObject> getTypes(Set<Type> names) {
        Set<MyObject> myObjects = new HashSet<>();          
        myObjects = names.stream().filter(Objects::nonNull).mapToInt(Type::getId)
                .mapToObj(cache::getMyObject)
                .collect(Collectors.toSet());

I'm using cache to convert to MyObject, but exception seems to thrown in collect method (DAOImpl.java line 114)

java.util.ConcurrentModificationException
        at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1558)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at com.dao.DAOImpl.getTypes(DAOImpl.java:114)
        at com.dao.DAOImpl$$FastClassBySpringCGLIB$$affe23c4.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
        at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
        at com.dao.DAOImpl$$EnhancerBySpringCGLIB$$6000bd7e.getTypes(<generated>)

How can I use cache to map to objects, or must I use cache outside stream?

Notice cache should be updated only 1 per day



Solution 1:[1]

The issue was that 2 threads changed myObjects so the expected count of the Set when looping through it was compromised, therefore the exception

The fix was to make the above service call atomic and therefore to prevent changing Set on runtime with a different thread

Solution 2:[2]

The problem is not related with your cache::getMyObject. The problem is that your names Set is being changed while you stream it, hence the ConcurrentModificationException.

If you wrap (create a new collection) from your names collection the problem is fixed.

public Set<MyObject> getTypes(Set<Type> names) {
        Set<MyObject> myObjects = new HashSet<>(names).stream().filter(Objects::nonNull).mapToInt(Type::getId)
                .mapToObj(cache::getMyObject).collect(Collectors.toSet());

Nonetheless, I think your cache method (getMyObject) can return null. In that case you should filter them out.

If you want to trigger the exception you can do something like this:

 public static void main(String[] args) {
    Set<String> names = new HashSet();

    names.add("a");
    names.add("b");

    Thread x = new Thread(){
      @Override
      public void run() {
        try {
          sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }

        names.add("c");
      }
    };

    x.start();

    Set<Object> myObjects = new HashSet<>(names).stream().filter(Objects::nonNull).mapToInt(s -> s.hashCode()).mapToObj(i -> String.valueOf(i)).map(x1-> getx(x1)).collect(Collectors.toSet());

  }

  private static Object getx(String x) {

    try {
      Thread.sleep(2000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    return x;
  }

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 user7294900
Solution 2