'Streaming JSON data from Kafka into Pyspark: aggregation not working on the JSON data
I am trying to read JSON data from Kafka into Spark using Python and then doing some aggregation operations on the data, but there is some problem. I have the following code: First I just send the input data to the console:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-
10_2.11:2.4.0 pyspark-shell'
from pyspark.sql.functions import from_json
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col
spark = SparkSession\
.builder\
.appName("Total-spending-for-top-users")\
.getOrCreate()
df = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "saurabmarjara2")\
.option("startingOffsets", "earliest")\
.load()
jsonschema = StructType([StructField("order_id", IntegerType()),
StructField("customer_id", IntegerType()),
StructField("taxful_total_price", IntegerType())])
mta_stream = df.select(from_json(col("value").cast("string"), jsonschema) \
.alias("parsed_mta_values"))
mta_data = mta_stream.select("parsed_mta_values.*")
qry = mta_data.writeStream.outputMode("append").format("console").start()
qry.awaitTermination()
This works correctly. The output is:
| order_id | customer_id | taxful_total_price |
|---|---|---|
| 937665 | 7 | 10 |
| 937666 | 3 | 4 |
| 937667 | 6 | 3 |
| 937668 | 4 | 4 |
But I want to perform aggregation on the data by grouping by the customer_id field and summing the taxful_total_price. This will give me the total spending for every customer.
Here are the changes I made to the code:
df2 =
mta_data.groupBy("customer_id").agg(sum("taxful_total_price").
alias("total_spending"))
qry = df2.writeStream.outputMode("append").format("console").start()
qry.awaitTermination()
This is the error I am getting:
File "sparkconsumer.py", line 31, in df2=mta_data.groupBy("customer_id").agg(sum("taxful_total_price").alias("total_spending")) TypeError: unsupported operand type(s) for +: 'int' and 'str'
I have specified that each of the columns are integer types in the jsonschema. I think this is a problem with:
mta_stream.select("parsed_mta_values.*")
I tried this:
df2 = mta_data.groupBy(
"parsed_mta_values.customer_id").agg(sum(
"parsed_mta_values.taxful_total_price").alias("total_spending"))
But this gives the same error as above.
Please help!
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
