'saveToCassandra hangs unexpectedly

This is what my code used to look like:

    val finalEmails : Dataset[Models.Mail] = HtmlBuilder.main(dataEmails, created_date_string)

    val newCampaigns : DataFrame = finalEmails.groupBy("usecase")
        .agg(count("*").alias("selected"),first("subject").as("subject"),first("html").as("html"))
        .withColumn("id_account",lit(id_account_saveCassandra))

    newCampaigns.as[Models.AggMails].foreach( aggmails  => {
        HtmlBuilder.saveToCassandra(aggmails.id_account, aggmails.selected, aggmails.subject, aggmails.usecase, aggmails.html, created_date_string.toLong)
    })

HtmlBuilder.scala:

object HtmlBuilder {
    def main(emailsToSend: Dataset[Models.EmailToSend], created_date_string: String) : Dataset[Models.Mail] = {
        val builtHtmls: Dataset[Models.Mail] = emailsToSend.as[Models.EmailToSend]
            .filter(emailtosend => { getFilteredMail(emailtosend) })
            .map(emailtosend => { getFormattedMail(emailtosend) })

        builtHtmls
    }

    def saveToCassandra(id_account: String, selected: Long, subject: String, useCase: String, html: String, created_date: Long ) = {
        import com.datastax.spark.connector._

        val campaign: List[Models.Campaign] = List(new Models.Campaign(id_account, true, created_date, html, selected, "created", subject, useCase))
        val rdd: RDD[Models.Campaign] = Spark.session.sparkContext.parallelize(campaign);

        Try(rdd.saveToCassandra("kiliba", "campaigns", SomeColumns("id_account","automatic_campaign", "created_date", "html", "selected", "status", "subject", "usecase"))) match {
            case Failure(exception) ⇒  println("ERROR CAMPAIGNS : " + exception); System.exit(0)
            case Success(value)     ⇒  println("CAMPAIGN SAVED")
        }
    }

    // some other functions...
}
object Models {
    case class EmailToSend(
        id_customer: String,
        usecase: String,
        customer: Customer,
        godson_customer: Customer,
        sponsor_customer: Customer,
        products: Seq[Models.Product],
        cart_link: String
    )

    case class Mail (
        val id_account: String,
        val smtp: String,
        val to: String,
        val sender: String,
        val subject: String,
        val alias: String,
        val html: String,
        val text: String,
        val usecase: String,
        val created_date: String,
        val id_customer: String
    )

    case class AggMails (
        val selected : Long,
        val subject: String,
        val usecase: String,
        val id_account: String,
        val html: String
    )

    case class Campaign(
        id_account: String,
        automatic_campaign: Boolean,
        created_date: Long,
        html: String,
        selected: Long,
        status: String,
        subject: String,
        usecase: String
    )
}

It processes some raw data dataEmails in HtmlBuilder.main and formats it into an email format dataEmails, and then into a campaign format newCampaigns. These campaigns are then stored in our database with HtmlBuilder.saveToCassandra.

This works.


However I needed to process dataEmails as an array instead of a dataset, since the mapping and filtering functions in HtmlBuilder.main must be called only once: I added some code that cannot be called multiple times, such as an API call to create som data.

I found a simple solution which is to convert dataEmails to an array when passing it to HtmlBuilder.main, and then convert it back to a dataset when returning it from HtmlBuilder.main:

    // I simply added a .collect()
    val finalEmails : Dataset[Models.Mail] = HtmlBuilder.main(dataEmails.collect(), created_date_string)

HtmlBuilder.scala:

object HtmlBuilder {
    // type of emailsToSend is now array
    def main(emailsToSend: Array[Models.EmailToSend], created_date_string: String) : Dataset[Models.Mail] = {
        var formattedEmailsToSend: Array[Models.EmailToSend] = emailsToSend

        // apply a filter and a map on the array, hence only once, on HtmlBuilder.main call, instead of
        // every time the dataset is executed
        val builtHtmls: Array[Models.Mail] = formattedEmailsToSend
            .filter(emailtosend => { getFilteredMail(emailtosend) })
            .map(emailtosend => { getFormattedMail(emailtosend) })

        // convert the array back to a dataset
        builtHtmls.toSeq.toDF().as[Models.Mail]
    }
}

It seems to me like this is fine, however, HtmlBuilder.saveToCassandra now hangs eternally, and I am unable to understand why.


I tried looking at every log, every code, every subject on the Internet but found nothing, not even an error in my logs. I logged the data processed at all stages and it always look exactly the same between v1 and v2.

Also, I noted that there seem to be a problem of asynchronicity, since logs in HtmlBuilder.saveToCassandra are mixed up incomprehensibly in v2, while there are ordered 1 by 1 in v1, which is what I would expect in v2. I added some logs in saveToCassandra.

v1 spark logs:

aggmails
fff1
fff3
fff4
(fake_5db9a3a917774f0011efadf1_20-01-2022_08:59:03,1,Christine, c’est notre anniversaire ❤️,anniversaryregistration,1642669261240)
fff42
List(Campaign(fake_5db9a3a917774f0011efadf1_20-01-2022_08:59:03,true,1642669261240,html,1,created,Christine, c’est notre anniversaire ❤️,anniversaryregistration))
fff43
40653
fff44
fff5
fff6
CAMPAIGN SAVED
fff7
fff2
 
aggmails
fff1
fff3
fff4
(fake_5db9a3a917774f0011efadf1_20-01-2022_08:59:03,1,Claire, joyeux anniversaire 🎂,birthday,1642669261240)
fff42
List(Campaign(fake_5db9a3a917774f0011efadf1_20-01-2022_08:59:03,true,1642669261240,html,1,created,Claire, joyeux anniversaire 🎂,birthday))
fff43
41245
fff44
fff5
fff6
CAMPAIGN SAVED
fff7
fff2
 
aggmails
fff1
fff3
fff4
(fake_5db9a3a917774f0011efadf1_20-01-2022_08:59:03,1,J-4 avant le Black Friday chez nat & nin,blackfriday1,1642669261240)
fff42
List(Campaign(fake_5db9a3a917774f0011efadf1_20-01-2022_08:59:03,true,1642669261240,html,1,created,J-4 avant le Black Friday chez nat & nin,blackfriday1))
fff43
30341
fff44
fff5
fff6
CAMPAIGN SAVED
fff7
fff2

Etc, each campaign is processed one by one, with no overlapping.

v2:


aggmails
fff1
aggmails
fff1
fff3
fff4
fff3
fff4
aggmails
fff1
fff3
fff4
(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,1,Christine, c’est notre anniversaire ❤️,anniversaryregistration,1642669978118)
fff42
(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,1,Polinard-Rawdon, bienvenue chez nous,welcome,1642669978118)
fff42
(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,1,Aude, ces produits n'attendent que vous !,postvisits,1642669978118)
fff42
List(Campaign(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,true,1642669978118,html,1,created,Polinard-Rawdon, bienvenue chez nous,welcome))
List(Campaign(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,true,1642669978118,html,1,created,Christine, c’est notre anniversaire ❤️,anniversaryregistration))
fff43
List(Campaign(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,true,1642669978118,html,1,created,Aude, ces produits n'attendent que vous !,postvisits))
fff43
44204
fff44
fff5
40569
fff44
fff5
fff43
40319
fff44
aggmails
fff1
fff3
fff4
(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,1,J-2 avant le Black Friday chez nat & nin,blackfriday2,1642669978118)
fff42
fff6
fff6
List(Campaign(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,true,1642669978118,html,1,created,J-2 avant le Black Friday chez nat & nin,blackfriday2))
fff43
30285
fff44
fff5
fff5
fff6
fff6

Everything is mixed up. What's going on?

PS: I tried reducing my code to the smallest possible but unfortunately it's part of a too complex program for me to make a simple reproducible example, this is the best I can do.



Solution 1:[1]

You really don't need to do that this way:

newCampaigns.as[Models.AggMails].foreach( aggmails  => {
        HtmlBuilder.saveToCassandra(aggmails.id_account, 
         aggmails.selected, aggmails.subject, 
         aggmails.usecase, aggmails.html, created_date_string.toLong)
    })

Spark Cassandra Connector supports direct writing of the dataframes/datasets into a table without need to use RDD API, especially from inside of the foreach call. Just do (it's important to use mode("append") to avoid removal of data - overwrite will truncate the table)

newCampaigns.as[Models.AggMails].write
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "table_name", "keyspace" -> "ks_name"))
  .mode("append")
  .save()

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Alex Ott