'sql in spark structure streaming
I am exploring structured streaming by doing a small POC. Below is the code that have written so far. However, I would like to validate some of answers that I could not find in spark documentation(I may have missed it).
Validated so far:
- Can we process sql query dynamically or conditionally ? yes, I could pass the sql query as an argument and start the execution.
- Can sql query can run in parallel : yes as per (How does Structured Streaming execute separate streaming queries (in parallel or sequentially)?)
Need to validate
- what are limitation of the sql query : I found that we cannot perform all type of sql query, as we normally do for relational database for example, partition.
- Can execution of particular sql be terminated conditionally ?
Can anyone help me to guide what are the limitation that I need to consider while generating sql queries. I know its very broad question to ask but any guidance will be very helpful that could help me to look in right direction.
The POC code.
"""
Run the example
`$ bin/spark-submit examples/src/main/python/sql/streaming/structured_kafka_SQL_query.py \
host1:port1,host2:port2 subscribe topic1,topic2`
"""
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql import window as w
if __name__ == "__main__":
if len(sys.argv) != 4:
print("""
Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
""", file=sys.stderr)
sys.exit(-1)
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
schema = StructType([
StructField("p1", StringType(), True),
StructField("p2", StringType(), True),
StructField("p3" , StringType(), True)
])
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()\
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))\
.select(col("parsed_value.*"))
query="select count(*),p1,p2 from events group by p1,p2"
query1="select count(*),p2,p3 from events group by p2,p3"
query_list=[query,query1] # it can be passed as an argument.
lines.createOrReplaceTempView("events")
for q in query_list:
spark.sql(q).writeStream\
.outputMode('complete')\
.format('console')\
.start()
spark.streams.awaitAnyTermination()
Please let me know if my question is still unclear, I can update it accordinlgy.
Solution 1:[1]
answering to one part of my own question.
Can execution of particular sql be terminated conditionally ?
yes, spark provides stream query management API to stop the streaming queries.
StreamingQuery.stop()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Monu |
