'Flink Falling back to default Kryo serializer because Chill serializer couldn't be found
I am using flink v1.13.2
One of the task managers throw an Chill serializer couldn't be found exception.
In the log file:
WARN org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Falling back to default Kryo serializer because Chill serializer couldn't be found.
java.lang.reflect.InvocationTargetException: null
at jdk.internal.reflect.GeneratedMethodAccessor230.invoke(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:444) [flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:467) [flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:345) [flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156) [flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:205) [flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:191) [flink-dist_2.11-1.13.2.jar:1.13.2]
Caused by: java.lang.ClassNotFoundException: scala/Function0
at java.lang.Class.forName0(Native Method) ~[?:?]
at java.lang.Class.forName(Class.java:398) ~[?:?]
at com.twitter.chill.KryoBase$$anonfun$1.apply(KryoBase.scala:41) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at com.twitter.chill.KryoBase$$anonfun$1.apply(KryoBase.scala:41) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at scala.collection.immutable.Range.foreach(Range.scala:160) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at com.twitter.chill.KryoBase.<init>(KryoBase.scala:41) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.types.EmptyFlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:45) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.types.FlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:82) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
... 9 more
And in the .out file, it says:
Exception in thread "Thread-2245855" java.lang.IllegalArgumentException: classLoader cannot be null.
at com.esotericsoftware.kryo.Kryo.setClassLoader(Kryo.java:975)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:493)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:345)
at org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:205)
at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:191)
...
# An error report file with more information is saved as:
# .../hs_err_pid8002.log
Then I just analyzed the hs_err file in the https://fastthread.io, but couldn't find anything related to my source codes
Active Thread (when app crashed)
Thread-2245855 - nativeThreadId:10086 - state:_thread_in_vm - threadType:JavaThread
Stacktrace:
[libjvm.so+0x78dcdf] Exceptions::_throw_oop(Thread*, char const*, int, oopDesc*)+0x15f
[libjvm.so+0x94ed00] jni_Throw+0x90
[librocksdbjni-linux64.so+0x2c8821] JavaListElementFilter::NextUnexpiredOffset(rocksdb::Slice const&, long, long) const+0x121
[librocksdbjni-linux64.so+0x5e6ae1] rocksdb::flink::FlinkCompactionFilter::ListDecide(rocksdb::Slice const&, std::string*) const+0x81
[librocksdbjni-linux64.so+0x5e703a] rocksdb::flink::FlinkCompactionFilter::FilterV2(int, rocksdb::Slice const&, rocksdb::CompactionFilter::ValueType, rocksdb::Slice const&, std::string*, std::string*) const+0x14a
[librocksdbjni-linux64.so+0x347aac] rocksdb::CompactionIterator::InvokeFilterIfNeeded(bool*, rocksdb::Slice*)+0x5dc
[librocksdbjni-linux64.so+0x348b85] rocksdb::CompactionIterator::NextFromInput()+0x4a5
[librocksdbjni-linux64.so+0x34ab81] rocksdb::CompactionIterator::SeekToFirst()+0x11
[librocksdbjni-linux64.so+0x35429f] rocksdb::CompactionJob::ProcessKeyValueCompaction(rocksdb::CompactionJob::SubcompactionState*)+0x40f
[librocksdbjni-linux64.so+0x355cb8] rocksdb::CompactionJob::Run()+0x288
[librocksdbjni-linux64.so+0x397c31] rocksdb::DBImpl::BackgroundCompaction(bool*, rocksdb::JobContext*, rocksdb::LogBuffer*, rocksdb::DBImpl::PrepickedCompaction*)+0xc71
[librocksdbjni-linux64.so+0x399441] rocksdb::DBImpl::BackgroundCallCompaction(rocksdb::DBImpl::PrepickedCompaction*, rocksdb::Env::Priority)+0xc1
[librocksdbjni-linux64.so+0x39997a] rocksdb::DBImpl::BGWorkCompaction(void*)+0x3a
[librocksdbjni-linux64.so+0x57d144] rocksdb::ThreadPoolImpl::Impl::BGThread(unsigned long)+0x254
[librocksdbjni-linux64.so+0x57d2cf] rocksdb::ThreadPoolImpl::Impl::BGThreadWrapper(void*)+0x4f
The latest thing i did, I added new aggregate function:
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
public class MyAggregateFunc implements AggregateFunction<Tuple2<Integer, Long>, Set<Long>, Integer>
{
@Override
public Set<Long> createAccumulator()
{
return new HashSet<>();
}
// ...
}
Finally, I just found the problem. The problem was the ListState<MyObject>. In somehow MyObject isn't ready to use for the ListState. I just removed ListState.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
