'Cannot process kinesis structured streams

I can read the kinesis stream fine but whenever I try to write the data I get the following error:

An error was encountered:
'Error while Describe Streams\n=== Streaming Query ===\nIdentifier: [i

My config is as follows:

{ "conf": {"spark.jars.packages": "com.qubole.spark/spark-sql-kinesis_2.11/1.1.3-spark_2.4" ,
           "spark.pyspark.python": "python3",
            "spark.pyspark.virtualenv.enabled": "true",
            "spark.pyspark.virtualenv.type":"native",
            "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"}}

kinesisDF = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", streamName) \
  .option("initialPosition", "earliest") \
  .option("region", "us-east-1") \
  .option("stsAssumeRoleArn", roleArn) \
  .load()

The write command is below:

kinesisDF\
    .writeStream.format("console")\
    .start().awaitTermination()

The write command always returns the following error:

An error was encountered:
'Error while Describe Streams\n=== Streaming Query ===\nIdentifier: [id = 4d315e21-2195-4502-a48c-6c313fc6f9af, runId = 4695b210-69eb-4ffc-a076-dab037656eed]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nKinesisSource[KinesisDataStream]'
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 103, in awaitTermination
    return self._jsq.awaitTermination()
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 75, in deco
    raise StreamingQueryException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.StreamingQueryException: 'Error while Describe Streams\n=== Streaming Query ===\nIdentifier: [id = 4d315e21-2195-4502-a48c-6c313fc6f9af, runId = 4695b210-69eb-4ffc-a076-dab037656eed]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nKinesisSource[KinesisDataStream]'

Any help trying to debug this would be greatly appreciated!

Thanks,

Harry



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source