'Streaming JSON data from Kafka into Pyspark: aggregation not working on the JSON data

I am trying to read JSON data from Kafka into Spark using Python and then doing some aggregation operations on the data, but there is some problem. I have the following code: First I just send the input data to the console:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0- 
10_2.11:2.4.0 pyspark-shell'
from pyspark.sql.functions import from_json
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col

spark = SparkSession\
    .builder\
    .appName("Total-spending-for-top-users")\
    .getOrCreate()

df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "saurabmarjara2")\
    .option("startingOffsets", "earliest")\
    .load()


jsonschema = StructType([StructField("order_id", IntegerType()),
                StructField("customer_id", IntegerType()),
                StructField("taxful_total_price", IntegerType())])

mta_stream = df.select(from_json(col("value").cast("string"), jsonschema) \
                        .alias("parsed_mta_values"))

mta_data = mta_stream.select("parsed_mta_values.*")

qry = mta_data.writeStream.outputMode("append").format("console").start()
qry.awaitTermination()

This works correctly. The output is:

order_id customer_id taxful_total_price
937665 7 10
937666 3 4
937667 6 3
937668 4 4

But I want to perform aggregation on the data by grouping by the customer_id field and summing the taxful_total_price. This will give me the total spending for every customer.

Here are the changes I made to the code:

df2 = 
mta_data.groupBy("customer_id").agg(sum("taxful_total_price"). 
alias("total_spending"))

qry = df2.writeStream.outputMode("append").format("console").start()
qry.awaitTermination()

This is the error I am getting:

File "sparkconsumer.py", line 31, in df2=mta_data.groupBy("customer_id").agg(sum("taxful_total_price").alias("total_spending")) TypeError: unsupported operand type(s) for +: 'int' and 'str'

I have specified that each of the columns are integer types in the jsonschema. I think this is a problem with:

mta_stream.select("parsed_mta_values.*")

I tried this:

 df2 = mta_data.groupBy( 
 "parsed_mta_values.customer_id").agg(sum( 
 "parsed_mta_values.taxful_total_price").alias("total_spending"))

But this gives the same error as above.

Please help!



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source