'Spark Streaming Job is running very slow (S3 upload is still in processing)
I am running a spark streaming job which is reading data continuously from a kafka topic with 12 partitions in the batch of 30secs and uploads it to s3 bucket.
The jobs are running extremely slow. Please check the below code
package com.example.main;
import com.example.Util.TableSchema;
import com.example.config.KafkaConfig;
import com.example.monitoring.MicroMeterMetricsCollector;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.Level;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import static com.example.Util.Parser.*;
import static com.example.Util.SchemaHelper.checkSchema;
import static com.example.Util.SchemaHelper.mapToSchema;
public class StreamApp {
private static final String DATE = "dt";
private static final String HOUR = "hr";
private static final Logger LOG = LoggerFactory.getLogger(StreamApp.class);
private static Collection<String> kafkaTopics;
private static MicroMeterMetricsCollector microMeterMetricsCollector;
public static void main(String[] args) {
if(StringUtils.isEmpty(System.getenv("KAFKA_BOOTSTRAP_SERVER"))) {
System.err.println("Alert mail id is empty");
return;
}
final String KAFKA_BOOTSTRAP_SERVER = "localhost:9092";
kafkaTopics = Arrays.asList("capp_event");
LOG.info("Initializing the metric collector");
microMeterMetricsCollector = new MicroMeterMetricsCollector();
org.apache.log4j.Logger.getLogger("org.apache").setLevel(Level.WARN);
/**
* Spark configurations
*/
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[*]");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("fs.s3a.access.key", "AKIAWAZAF7LRUXGCJTX3");
sparkConf.set("fs.s3a.secret.key", "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");
sparkConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
sparkConf.set("mapreduce.fileoutputcommitter.algorithm.version","2");
sparkConf.setAppName("Fabric-Streaming");
/**
* Kafka configurations
*/
KafkaConfig kafkaConfig = new KafkaConfig();
kafkaConfig.setKafkaParams(KAFKA_BOOTSTRAP_SERVER);
Map<String, Object> kafkaParamsMap = kafkaConfig.getKafkaParams();
/**
* Connect Kafka topic to the JavaStreamingContext
*/
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(30));
streamingContext.checkpoint("/Users/ritwik.raj/desktop/checkpoint");
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(streamingContext.sparkContext());
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(kafkaTopics, kafkaParamsMap));
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
/**
* Iterate over JavaDStream object
*/
stream.foreachRDD( rdd -> {
/**
* Offset range for this batch of RDD
*/
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
JavaRDD<TableSchema> rowRdd = rdd
.filter( value -> checkSchema(value.value()) )
.map( json -> mapToSchema(json.value()) )
.map( row -> addPartitionColumn(row) );
Dataset<Row> df = sqlContext.createDataFrame(rowRdd, TableSchema.class);
df.write().mode(SaveMode.Append)
.partitionBy(DATE, HOUR)
.option("compression", "snappy")
.orc("s3a://data-qritwik/capp_event/test/");
/**
* Offset committed for this batch of RDD
*/
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
LOG.info("Offset committed");
});
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
In spark UI a new job is created after every 30 secs but first job is still in processing state and others are getting queued.
I checked inside the first job to know the reason why it is processing state, but I found all the tasks inside are SUCCEEDED but still the first job is in processing state, because of this other jobs are getting queued.
Please let the know the reason why first job is still in processing and how to optimise this, such that upload to s3 becomes fast.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|


