'Custom sort order on a Spark dataframe/dataset

I have a web service built around Spark that, based on a JSON request, builds a series of dataframe/dataset operations.

These operations involve multiple joins, filters, etc. that would change the ordering of the values in the columns. This final data set could have rows to the scale of millions.

Preferably without converting it to an RDD, is there anyway to apply a custom sort(s) on some columns of the final dataset based on the order of elements passed in as Lists?

The original dataframe is of the form

+----------+----------+
| Column 1 | Column 2 |
+----------+----------+
| Val 1    | val a    |
+----------+----------+
| Val 2    | val b    |
+----------+----------+
| val 3    | val c    |
+----------+----------+

After a series of transformations are performed, the dataframe ends up looking like this.

+----------+----------+----------+----------+
| Column 1 | Column 2 | Column 3 | Column 4 |
+----------+----------+----------+----------+
| Val 2    | val b    | val 999  | val 900  |
+----------+----------+----------+----------+
| Val 1    | val c    | val 100  | val 9$#@ |
+----------+----------+----------+----------+
| val 3    | val a    | val 2##  | val $#@8 |
+----------+----------+----------+----------+

I now need to apply a sort on multiple columns based on the order of the values passed as an Array list.

For example:
Col1values Order=[val 1,val 3,val 2}
Col3values Order=[100,2##,999].



Solution 1:[1]

Custom sorting works by creating a column for sorting. It does not need to be a visible column inside the dataframe. I can show it using PySpark.

Initial df:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(1, 'a', 'A'),
     (2, 'a', 'B'),
     (3, 'a', 'C'),
     (4, 'b', 'A'),
     (5, 'b', 'B'),
     (6, 'b', 'C'),
     (7, 'c', 'A'),
     (8, 'c', 'B'),
     (9, 'c', 'C')],
    ['id', 'c1', 'c2']
)

Custom sorting on 1 column:

from itertools import chain

order = {'b': 1, 'a': 2, 'c': 3}

sort_col = F.create_map([F.lit(x) for x in chain(*order.items())])[F.col('c1')]
df = df.sort(sort_col)

df.show()
# +---+---+---+
# | id| c1| c2|
# +---+---+---+
# |  5|  b|  B|
# |  6|  b|  C|
# |  4|  b|  A|
# |  1|  a|  A|
# |  2|  a|  B|
# |  3|  a|  C|
# |  7|  c|  A|
# |  8|  c|  B|
# |  9|  c|  C|
# +---+---+---+

On 2 columns:

from itertools import chain

order1 = {'b': 1, 'a': 2, 'c': 3}
order2 = {'B': 1, 'C': 2, 'A': 3}

sort_col1 = F.create_map([F.lit(x) for x in chain(*order1.items())])[F.col('c1')]
sort_col2 = F.create_map([F.lit(x) for x in chain(*order2.items())])[F.col('c2')]
df = df.sort(sort_col1, sort_col2)

df.show()
# +---+---+---+
# | id| c1| c2|
# +---+---+---+
# |  5|  b|  B|
# |  6|  b|  C|
# |  4|  b|  A|
# |  2|  a|  B|
# |  3|  a|  C|
# |  1|  a|  A|
# |  8|  c|  B|
# |  9|  c|  C|
# |  7|  c|  A|
# +---+---+---+

Or as a function:

from itertools import chain
def cust_sort(col: str, order: dict):
    return F.create_map([F.lit(x) for x in chain(*order.items())])[F.col(col)]

df = df.sort(
    cust_sort('c1', {'b': 1, 'a': 2, 'c': 3}),
    cust_sort('c2', {'B': 1, 'C': 2, 'A': 3})
)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 ZygD