'Spark streaming read exception - java.net.SocketException; py4j.Py4JNetworkException

I am been using spark streaming to read directly from a Kafka topic. Everything has been fine, but today I started getting this error and I can't figure out what's wrong, been looking everywhere.

=== Streaming Query ===
Identifier: ...
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[TopicName]]: {"TopicName":{"0":6}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [cast(value#95 as string) AS value#108]
+- StreamingDataSourceV2Relation [key#94, value#95, topic#96, partition#97, offset#98L, timestamp#99, timestampType#100], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7af16b01, KafkaV2[Subscribe[TopicName]]

22/04/19 21:56:28 ERROR MicroBatchExecution: Query [...] terminated with error
py4j.Py4JException: Error while sending a command.
        at py4j.CallbackClient.sendCommand(CallbackClient.java:397)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
        at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
        at com.sun.proxy.$Proxy14.call(Unknown Source)
        at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:55)
        at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:55)
        at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
        at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
Caused by: py4j.Py4JNetworkException: Error while sending a command: c
p4
call
ro161
L0
e

        at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:253)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
        ... 30 more
Caused by: java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:210)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.io.BufferedReader.readLine(BufferedReader.java:324)
        at java.io.BufferedReader.readLine(BufferedReader.java:389)
        at py4j.ClientServerConnection.readBlockingResponse(ClientServerConnection.java:313)
        at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:229)
        ... 31 more
Exception: Error while sending a command.

I really have no idea what's going on. If anyone has any idea, it would be very much appreciated! Thanks.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source