'Spark stream stops abruptly - "the specified path does not exist"
I am working on the spark structure streaming. My Stream works fine but after sometime it just stops because of below issue.
Any suggestion what could be the reason and how to resolve this issue.
java.io.FileNotFoundException: Operation failed: "The specified path does not exist.", 404, GET, https://XXXXXXXX.dfs.core.windows.net/output?upn=false&resource=filesystem&maxResults=5000&directory=XXXXXXXX&timeout=90&recursive=true, PathNotFound, "The specified path does not exist. RequestId:d1b7c77f-e01f-0027-7f09-4646f7000000 Time:2022-04-01T20:47:30.1791444Z"
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1290) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listKeysWithPrefix(AzureBlobFileSystem.java:530) at com.databricks.tahoe.store.EnhancedAzureBlobFileSystemUpgrade.listKeysWithPrefix(EnhancedFileSystem.scala:605) at com.databricks.tahoe.store.EnhancedDatabricksFileSystemV2.$anonfun$listKeysWithPrefix$1(EnhancedFileSystem.scala:374) at com.databricks.backend.daemon.data.client.DBFSV2.$anonfun$listKeysWithPrefix$1(DatabricksFileSystemV2.scala:247) at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:395) at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:484) at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:504) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.withAttributionContext(DatabricksFileSystemV2.scala:510) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.withAttributionTags(DatabricksFileSystemV2.scala:510) at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:479) at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:404) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.recordOperationWithResultTags(DatabricksFileSystemV2.scala:510) at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:395) at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:367) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.recordOperation(DatabricksFileSystemV2.scala:510) at com.databricks.backend.daemon.data.client.DBFSV2.listKeysWithPrefix(DatabricksFileSystemV2.scala:240) at com.databricks.tahoe.store.EnhancedDatabricksFileSystemV2.listKeysWithPrefix(EnhancedFileSystem.scala:374) at com.databricks.tahoe.store.AzureLogStore.listKeysWithPrefix(AzureLogStore.scala:54) at com.databricks.tahoe.store.DelegatingLogStore.listKeysWithPrefix(DelegatingLogStore.scala:251) at com.databricks.sql.fileNotification.autoIngest.FileEventBackfiller$.listFiles(FileEventWorkerThread.scala:967) at com.databricks.sql.fileNotification.autoIngest.FileEventBackfiller.runInternal(FileEventWorkerThread.scala:876) at com.databricks.sql.fileNotification.autoIngest.FileEventBackfiller.run(FileEventWorkerThread.scala:809) Caused by: Operation failed: "The specified path does not exist.", 404, GET, https://XXXXXXXXXX.dfs.core.windows.net/output?upn=false&resource=filesystem&maxResults=5000&directory=XXXXXXXX&timeout=90&recursive=true, PathNotFound, "The specified path does not exist. RequestId:02ae07cf-901f-0001-080e-46dd43000000 Time:2022-04-01T21:21:40.2136657Z" at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:241) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsClient.listPath(AbfsClient.java:235) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listFiles(AzureBlobFileSystemStore.java:1112) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.access$200(AzureBlobFileSystemStore.java:143) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore$1.fetchMoreResults(AzureBlobFileSystemStore.java:1052) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore$1.(AzureBlobFileSystemStore.java:1033) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listKeysWithPrefix(AzureBlobFileSystemStore.java:1029) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listKeysWithPrefix(AzureBlobFileSystem.java:527) ... 27 more
Below is my code:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.types import StructType, StringType
from pyspark.sql import functions as F
from delta.tables import *
spark.sql("set spark.sql.files.ignoreMissingFiles=true")
filteredRawDF = ""
try:
filteredRawDF = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", landingcheckPointFilePath) \
.option("cloudFiles.inferColumnTypes", "true") \
.load(landingFilePath) \
.select(from_json('body', schema).alias('temp')) \
.select(explode("temp.report.data").alias("details")) \
.select("details",
explode("details.breakdown").alias("inner_breakdown")) \
.select("details","inner_breakdown",
explode("inner_breakdown.breakdown").alias("outer_breakdown"))\
.select(to_timestamp(col("details.name"), "yyyy-MM-
dd'T'HH:mm:ss+SSSS").alias('datetime'),
col("details.year"),
col("details.day"),
col("details.hour"),
col("details.minute"),
col("inner_breakdown.name").alias("hotelName"),
col("outer_breakdown.name").alias("checkindate"),
col("outer_breakdown.counts")[0].cast("int").alias("HdpHits"))
except Exception as e:
print(e)
query = filteredRawDF \
.writeStream \
.format("delta") \
.option("mergeSchema", "true") \
.outputMode("append") \
.option("checkpointLocation", checkPointPath) \
.trigger(processingTime='50 seconds') \
.start(savePath) '''
Thanks
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
