'PySpark reversing StringIndexer in nested array

I'm using PySpark to do collaborative filtering using ALS. My original user and item id's are strings, so I used StringIndexer to convert them to numeric indices (PySpark's ALS model obliges us to do so).

After I've fitted the model, I can get the top 3 recommendations for each user like so:

recs = (
    model
    .recommendForAllUsers(3)
)

The recs dataframe looks like so:

+-----------+--------------------+
|userIdIndex|     recommendations|
+-----------+--------------------+
|       1580|[[10096,3.6725707...|
|       4900|[[10096,3.0137873...|
|       5300|[[10096,2.7274625...|
|       6620|[[10096,2.4493625...|
|       7240|[[10096,2.4928937...|
+-----------+--------------------+
only showing top 5 rows

root
 |-- userIdIndex: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- productIdIndex: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

I want to create a huge JSOM dump with this dataframe, and I can like so:

(
    recs
    .toJSON()
    .saveAsTextFile("name_i_must_hide.recs")
)

and a sample of these jsons is:

{
  "userIdIndex": 1580,
  "recommendations": [
    {
      "productIdIndex": 10096,
      "rating": 3.6725707
    },
    {
      "productIdIndex": 10141,
      "rating": 3.61542
    },
    {
      "productIdIndex": 11591,
      "rating": 3.536216
    }
  ]
}

The userIdIndex and productIdIndex keys are due to the StringIndexer transformation.

How can I get the original value of these columns back? I suspect I must use the IndexToString transformer, but I can't quite figure out how since the data is nested in an array inside the recs Dataframe.

I tried to use a Pipeline evaluator (stages=[StringIndexer, ALS, IndexToString]) but it looks like this evaluator doesn't support these indexers.

Cheers!



Solution 1:[1]

The given answer as performance issues at least in my case it took too long. you can use IndexToString I provided a simple code snippet (assume having two StringIndexer for users and products


from pyspark.ml.feature import StringIndexer, IndexToString
idx_to_user = IndexToString(inputCol='userIdIndex',outputCol='user_id').setLabels(self.user_indexer.labels)
idx_to_prod = IndexToString(inputCol='productIdIndex',outputCol='product_id').setLabels(self.prod_indexer.labels)

recoms = idx_to_user.transform(recs)
res = self.idx_to_prod.transform(recoms.select(F.col('user_id'),F.explode('recommendations')).select('user_id','col.productIdIndex','col.rating'))
result = res.select('user_id','product_id','rating')


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 sajjad