'Cannot solve pyspark error when using CountVectorizer for NLP project

So I am new to pyspark but I cannot figure out this error: I have checked if I have any null values as that seemed to be the most common answer I could find on here.


Py4JJavaError                             Traceback (most recent call last)
<ipython-input-189-8d6ce03d857b> in <module>
      1 from pyspark.ml.feature import CountVectorizer
      2 count = CountVectorizer(inputCol="tokens", outputCol="rawFeatures")
----> 3 model = count.fit(wordsData)
      4 featurizedData = model.transform(wordsData)
      5 featurizedData.show()

~\anaconda3\lib\site-packages\pyspark\ml\base.py in fit(self, dataset, params)
    159                 return self.copy(params)._fit(dataset)
    160             else:
--> 161                 return self._fit(dataset)
    162         else:
    163             raise TypeError("Params must be either a param map or a list/tuple of param maps, "

~\anaconda3\lib\site-packages\pyspark\ml\wrapper.py in _fit(self, dataset)
    333 
    334     def _fit(self, dataset):
--> 335         java_model = self._fit_java(dataset)
    336         model = self._create_model(java_model)
    337         return self._copyValues(model)

~\anaconda3\lib\site-packages\pyspark\ml\wrapper.py in _fit_java(self, dataset)
    330         """
    331         self._transfer_params_to_java()
--> 332         return self._java_obj.fit(dataset._jdf)
    333 
    334     def _fit(self, dataset):

~\anaconda3\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
   1319 
   1320         answer = self.gateway_client.send_command(command)
-> 1321         return_value = get_return_value(
   1322             answer, self.gateway_client, self.target_id, self.name)
   1323 

~\anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

~\anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o1178.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 72.0 failed 1 times, most recent failure: Lost task 7.0 in stage 72.0 (TID 288) (DESKTOP-F23TMVF executor driver): org.apache.spark.SparkException: Failed to execute user defined function (Tokenizer$$Lambda$3083/0x0000000801372040: (string) => array<string>)
    at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
    at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1253)
    at org.apache.spark.ml.feature.CountVectorizer.fit(CountVectorizer.scala:233)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function (Tokenizer$$Lambda$3083/0x0000000801372040: (string) => array<string>)
    at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
    at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
Caused by: java.lang.NullPointerException 

Here is the code I am running:

import findspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
findspark.init(r"C:\\spark")

df_spark = spark.read.option('header',True).csv("comments_cleaned.csv")

from pyspark.ml.feature import  Tokenizer
tokenizer = Tokenizer(inputCol="no_stop", outputCol="tokens")
wordsData = tokenizer.transform(df_spark)
wordsData.show()

Here is a snap shot of the wordsData dataframe: enter image description here

from pyspark.ml.feature import CountVectorizer
count = CountVectorizer(inputCol="tokens", outputCol="rawFeatures")
model = count.fit(wordsData)
featurizedData = model.transform(wordsData)
featurizedData.show()

As you can tell from the error it fails at model = count.fit(wordsData). What am I doing wrong?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source