'Spark mapPartitionsToPai execution time

In the current project I am working, we are using spark as computation engine for one of workflows.

Workflow is as follows

We have product catalog being served from several pincodes. User logged in from any particular pin code should be able to see least available cost from all available serving pincodes.

Least cost is calculated as follows

product price+dist(pincode1,pincode2) -

pincode2 being user pincode and pincode1 being source pincode. Apply the above formula for all source pincodes and identify the least available one.

My Core spark logic looks like this

pincodes.javaRDD().cartesian(pincodePrices.javaRDD()).mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<Row,Row>>, Row, Row>() {

        @Override
        public Iterator<Tuple2<Row, Row>> call(Iterator<Tuple2<Row, Row>> t)
            throws Exception {
          
          MongoClient mongoclient = MongoClients.create("mongodb://localhost");
          MongoDatabase database = mongoclient.getDatabase("catalogue");
          MongoCollection<Document>pincodeCollection = database.getCollection("pincodedistances");
            
            List<Tuple2<Row,Row>> list =new LinkedList<>();
            while (t.hasNext()) {
            Tuple2<Row, Row>tuple2 = t.next();
            
            Row pinRow = tuple2._1;
            Integer srcPincode = pinRow.getAs("pincode");
            
            Row pricesRow = tuple2._2;
            Row pricesRow1 = (Row)pricesRow.getAs("leastPrice");
            Integer buyingPrice = pricesRow1.getAs("buyingPrice");
            Integer quantity = pricesRow1.getAs("quantity");
            Integer destPincode = pricesRow1.getAs("pincodeNum");
            
            
            if(buyingPrice!=null && quantity>0) {
                BasicDBObject dbObject = new BasicDBObject();
                dbObject.append("sourcePincode", srcPincode);
                dbObject.append("destPincode", destPincode);
                
                //System.out.println(srcPincode+","+destPincode);
                Number distance;
                
                if(srcPincode.intValue()==destPincode.intValue()) {
                  distance = 0;
                }else {
                  Document document = pincodeCollection.find(dbObject).first();
                  distance = document.get("distance", Number.class); 
                }
                
                double margin = 0.02;
                Long finalPrice = Math.round(buyingPrice+(margin*buyingPrice)+distance.doubleValue());
                
                //Row finalPriceRow = RowFactory.create(finalPrice,quantity);
                StructType structType = new StructType();
                structType = structType.add("finalPrice", DataTypes.LongType, false);
                structType = structType.add("quantity", DataTypes.LongType, false);
                Object values[] = {finalPrice,quantity};
                Row finalPriceRow = new GenericRowWithSchema(values, structType);
                
                list.add(new Tuple2<Row, Row>(pinRow, finalPriceRow));
            }
            
          }
            mongoclient.close();
          return list.iterator();
        }
      }).reduceByKey((priceRow1,priceRow2)->{
        
        Long finalPrice1 = priceRow1.getAs("finalPrice");
        Long finalPrice2 = priceRow2.getAs("finalPrice");
        
        if(finalPrice1.longValue()<finalPrice2.longValue())return priceRow1;
        return priceRow2;
      }).collect().forEach(tuple2->{
             // Business logic to push computed price to mongodb
        }

I am able to get the answer correctly, however mapPartitionsToPair is taking a bit of time(~22 secs for just 12k records).

After browsing internet I found that mapPartitions performs better than mapPartitionsToPair, but I am not sure how to emit (key,value) from mapPartitions and then sort it. Is there any alternative for above transformations or any better approach is highly appreciated.

Spark Cluster: Standalone(1 executor, 6 cores)



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source