'DF.topandas() - Failed to locate the winutils binary in the hadoop binary path

I am running a huge text file using PyCharm and PySpark.

This is what I am trying to do:

spark_home = os.environ.get('SPARK_HOME', None)
os.environ["SPARK_HOME"] = "C:\spark-2.3.0-bin-hadoop2.7"
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).getOrCreate() 
import pandas as pd
ip = spark.read.format("csv").option("inferSchema","true").option("header","true").load(r"some other file.csv")
kw = pd.read_csv(r"some file.csv",encoding='ISO-8859-1',index_col=False,error_bad_lines=False)
for i in range(len(kw)):
    rx = '(?i)'+kw.Keywords[i]
    ip = ip.where(~ip['Content'].rlike(rx))
op = ip.toPandas()
op.to_csv(r'something.csv',encoding='utf-8')

However, PyCharm is throwing me this error:

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2018-06-08 11:31:52 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
Traceback (most recent call last):
  File "C:/Users/mainak.paul/PycharmProjects/Concept_Building_SIP/ThemeSparkUncoveredGames.py", line 17, in <module>
    op = ip.toPandas()
  File "C:\Python27\lib\site-packages\pyspark\sql\dataframe.py", line 1966, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:\Python27\lib\site-packages\pyspark\sql\dataframe.py", line 466, in collect
    port = self._jdf.collectToPython()
  File "C:\Python27\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Python27\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Python27\lib\site-packages\py4j\protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o30.collectToPython.
: java.lang.IllegalArgumentException

I am just not getting why .toPandas() is not working. Spark version is 2.3. Has something changed in this version that I am unaware of? I ran this code in a different machine with spark 2.2, and it ran fine.

I even changed the export line to something like this

op = ip.where(ip['Content'].rlike(rx)).toPandas()

Still getting the same error. What am I doing wrong? Is there some other way of exporting pyspark.sql.dataframe.DataFrame to a .csv without compromising on performance?

EDITED I also tried using:

ip.write.csv('file.csv')

Now i am getting the following error:

Traceback (most recent call last):
  File "somefile.csv", line 21, in <module>
    ip.write.csv('somefile.csv')
  File "C:\Python27\lib\site-packages\pyspark\sql\readwriter.py", line 883, in csv
    self._jwrite.csv(path)
  File "C:\Python27\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Python27\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Python27\lib\site-packages\py4j\protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o102.csv.

Adding the stacktrace:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/06/11 16:53:14 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable C:\spark-2.3.0-bin-hadoop2.7\bin\bin\winutils.exe in the Hadoop binaries.
    at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:379)
    at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:394)
    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:387)
    at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:80)
    at org.apache.hadoop.security.SecurityUtil.getAuthenticationMethod(SecurityUtil.java:611)
    at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:273)
    at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:261)
    at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:791)
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:761)
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:634)
    at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2430)
    at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2430)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2430)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:295)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:488)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:236)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.base/java.lang.Thread.run(Thread.java:844)
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/C:/opt/spark/spark-2.2.0-bin-hadoop2.7/jars/hadoop-auth-2.7.3.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
18/06/11 16:53:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
  File "C:/Users/mainak.paul/PycharmProjects/Concept_Building_SIP/ThemeSparkUncoveredGames.py", line 22, in <module>
    op = ip.toPandas().collect()
  File "C:\Python27\lib\site-packages\pyspark\sql\dataframe.py", line 1937, in toPandas
    if self.sql_ctx.getConf("spark.sql.execution.pandas.respectSessionTimeZone").lower() \
  File "C:\Python27\lib\site-packages\pyspark\sql\context.py", line 142, in getConf
    return self.sparkSession.conf.get(key, defaultValue)
  File "C:\Python27\lib\site-packages\pyspark\sql\conf.py", line 46, in get
    return self._jconf.get(key)
  File "C:\Python27\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Python27\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Python27\lib\site-packages\py4j\protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o86.get.
: java.util.NoSuchElementException: spark.sql.execution.pandas.respectSessionTimeZone
    at org.apache.spark.sql.internal.SQLConf$$anonfun$getConfString$2.apply(SQLConf.scala:1089)
    at org.apache.spark.sql.internal.SQLConf$$anonfun$getConfString$2.apply(SQLConf.scala:1089)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.internal.SQLConf.getConfString(SQLConf.scala:1089)
    at org.apache.spark.sql.RuntimeConfig.get(RuntimeConfig.scala:74)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.base/java.lang.Thread.run(Thread.java:844)


Process finished with exit code 1


Solution 1:[1]

You need to change your code as follows:

spark_home = os.environ.get('SPARK_HOME', None)
os.environ["SPARK_HOME"] = "C:\spark-2.3.0-bin-hadoop2.7"
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).getOrCreate() 
import pandas as pd
ip = spark.read.format("csv").option("inferSchema","true").option("header","true").load(r"some other file.csv")
kw = pd.read_csv(r"some file.csv",encoding='ISO-8859-1',index_col=False,error_bad_lines=False)
for i in range(len(kw)):
    rx = '(?i)'+kw.Keywords[i]
    ip = ip.where(~ip['Content'].rlike(rx))
op = ip.toPandas().collect()
op.to_csv(r'something.csv',encoding='utf-8')

toPandas() needs to be followed by a collect() action in PySpark for the DataFrame to materialize. This however should not be done for large datasets, as toPandas().collect() causes the data to move to driver, which might crash in case the dataset is to big to fit into driver memory.

As for this line : ip.write.csv('file.csv') I belive it should be changed to ip.write.csv('file:///home/your-user-name/file.csv') to save the file on the local linux filesystem,

ip.option("header", "true").csv("file:///C:/out.csv") to save the file on the local windows filesystem (if you are running Spark and Hadoop on Windows)

or ip.write.csv('hdfs:///user/your-user/file.csv') to save the file to HDFS

Do tell me if this solution worked out for you.

UPDATE

https://github.com/steveloughran/winutils/tree/master/hadoop-2.7.1/binfollow this link and download the winutils.exe file. Create a folder named hadoop on your C drive and another folder called bin inside the hadoop folder. Place the winutils.exe you downloaded earlier into this directory. Then you need to edit the system variables and add the variable HADOOP_HOME to the list. Once this is done you wont get the error for winutils/hadoop from spark.

This is how the HADOOP_HOME variable is to be placed. Just type "Edit the system environment variables" in your windows search

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1