Category "pyspark"

Spark writing extra rows when saving to CSV

I wrote a file to parquet containing 1,000,000 rows. When I read the parquet file back, the result is 1,000,000 rows. df = spark.read.parquet(parquet_path) df.

Py4JJavaError: An error occurred while calling o143.parquet

So I have parquet files in S3 bucket and I want to load it using pyspark in python, but I'm getting some error, here's what I have tried so far. I'm using Juput

Pyspark: join and union in for loop

I have a really simple logic that I would like to understand how I can make it work in pyspark. for data in df1: spark_data_row = spark.createDataFrame(data

How to close the spark instance

I want to stop my spark instance here once I complete my job running on Jupyter notebook. I did execute spark.stop() at the end, but when I open my terminal, I'

Pyspark create schema for maptype with different value types

I need to give the correct schema to an rdd I have, but struggling with a maptype that has different valuetypes. I guess the problem is that one specific Key ha

Processing data from a kafka stream using Pyspark

What the console of the kafka consumer looks like: ["2017-12-31 16:06:01", 12472391, 1] ["2017-12-31 16:06:01", 12472097, 1] ["2017-12-31 16:05:59", 12471979,

How to apply a pandas geocode function to Pyspark column

Table is like this id ADDRESS 0 6101 SUMMITVIEW AVE STE 200 YAKIMA 1 527 CEDAR WAY SUITE 105 OAKMONT 2 1700 N ROSE AVE SUITE 460 OXNARD 3 1275 YORK AVE NEW YOR

Pyspark: Return next weeks saturday

I'm trying to return next weeks Saturday date from datatype column rel_d. Normally, in python, I'd subtract number of days till next Saturday and add it to the

How to handle memory issue while writing data in which a particular column contains very large data in each record in databricks in pyspark

I have a set of records with 10 columns. There is a column 'x' which contains an array of float values and the length of array can be very large(for eg, the len

How to manually checkpoint a delta table using PySpark?

I have a delta table, and am trying to append data to it and then checkpoint that table. By default I believe it checkpoints every 10 commits, but I would like

Spark partition size greater than the executor memory

I have four questions. Suppose in spark I have 3 worker nodes. Each worker node has 3 executors and each executor has 3 cores. Each executor has 5 gb memory. (T

How to divide two aggreate sum dataframe

I want to divide the sum of two columns in pyspark. For example, I have a datasets like below: A B C 1 1 2 3 2 1 2 3 3 1 2 3 What I want is t

Spark SQL: Parse date string from dd/mm/yyyy to yyyy/mm/dd

I want to use spark SQL or pyspark to reformat a date field from 'dd/mm/yyyy' to 'yyyy/mm/dd'. The field type is string: from pyspark.sql import SparkSession fr

java.lang.NoClassDefFoundError: org/apache/log4j/spi/Filter in SparkSubmit

I've been trying to submit applications to a Kubernetes. I have followed the tutorial in https://spark.apache.org/docs/latest/running-on-kubernetes.html such as

Pyspark Window function on entire data frame

Consider a pyspark data frame. I would like to summarize the entire data frame, per column, and append the result for every row. +-----+----------+-----------+

Delta Table / Athena And Spark

I have my delta table, which can be read from Athena. When I try to get the data through a query from spark I get the following error: Caused by: org.apache.sp

pyspark.sql.utils.AnalysisException: Failed to find data source: kafka

I am trying to read a stream from kafka using pyspark. I am using spark version 3.0.0-preview2 and spark-streaming-kafka-0-10_2.12 Before this I just stat zoo

Spark SQL error from EMR notebook with AWS Glue table partition

I'm testing some pyspark code in an EMR notebook before I deploy it and keep running into this strange error with Spark SQL. I have all my tables and metadata i

Spatial with SparkSQL/Python in Synapse Spark Pool using apache-sedona?

I would like to run spatial queries on large data sets; e.g. geopandas would be too slow. Inspiration I found here: https://anant-sharma.medium.com/apache-sedon

IllegalArgumentException: File must be dbfs or s3n: /

dbutils.fs.mount( source = f"wasbs://{blob.storage_account_container}@{blob.storage_account_name}.blob.core.windows.net/", mount_point = "/mnt/MLRExtract/"