'sql in spark structure streaming

I am exploring structured streaming by doing a small POC. Below is the code that have written so far. However, I would like to validate some of answers that I could not find in spark documentation(I may have missed it).

Validated so far:

Need to validate

  • what are limitation of the sql query : I found that we cannot perform all type of sql query, as we normally do for relational database for example, partition.
  • Can execution of particular sql be terminated conditionally ?

Can anyone help me to guide what are the limitation that I need to consider while generating sql queries. I know its very broad question to ask but any guidance will be very helpful that could help me to look in right direction.

The POC code.


"""
 Run the example
    `$ bin/spark-submit examples/src/main/python/sql/streaming/structured_kafka_SQL_query.py \
    host1:port1,host2:port2 subscribe topic1,topic2`
"""
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf

from pyspark.sql.types import IntegerType


from pyspark.sql import window as w


if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("""
        Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
        """, file=sys.stderr)
        sys.exit(-1)

    bootstrapServers = sys.argv[1]
    subscribeType = sys.argv[2]
    topics = sys.argv[3]

    spark = SparkSession\
        .builder\
        .appName("StructuredKafkaWordCount")\
        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

    
    schema = StructType([ 
        StructField("p1", StringType(), True),
        StructField("p2", StringType(), True),
        StructField("p3" , StringType(), True)
        ])


    lines = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", bootstrapServers)\
        .option(subscribeType, topics)\
        .load()\
        .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))\
        .select(col("parsed_value.*"))


    query="select count(*),p1,p2 from events group by p1,p2"

    query1="select count(*),p2,p3 from events group by p2,p3"

    query_list=[query,query1] # it can be passed as an argument. 

    lines.createOrReplaceTempView("events")


    for q in query_list:
        spark.sql(q).writeStream\
        .outputMode('complete')\
        .format('console')\
        .start()


    spark.streams.awaitAnyTermination()

Please let me know if my question is still unclear, I can update it accordinlgy.



Solution 1:[1]

answering to one part of my own question.

Can execution of particular sql be terminated conditionally ?

yes, spark provides stream query management API to stop the streaming queries.

StreamingQuery.stop()

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Monu