'How to save pyspark dataframe from hive into iginte cache?
I would like to save my pyspark dataframe into ignite cache following the approach from this blog, but some error happened. Error message is:
Traceback (most recent call last):
File "test_ignite.py", line 19, in <module>
.option("config", configFile) \
File "/home/hdoop/.local/lib/python3.6/site-packages/pyspark/sql/readwriter.py", line 738, in save
self._jwrite.save()
File "/home/hdoop/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1322, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/hdoop/.local/lib/python3.6/site-packages/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/home/hdoop/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o42.save.
: java.lang.ClassNotFoundException:
Failed to find data source: ignite. Please find packages at
http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:443)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:670)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:852)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: ignite.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:656)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:656)
at scala.util.Failure.orElse(Try.scala:224)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:656)
... 16 more
These are my steps:
- I executed a Python script.
python3 test_ignite.py - Created a SparkSession and connected to hive.
- Tried to get a Pyspark dataframe from hive.
- Saved it to the ignite cache. (error occurred)
My test_ignite.py code is:
import os
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyignite import Client
spark = SparkSession \
.builder \
.appName("consumer_hive") \
.config("hive.metastore.uris", "thrift://localhost:9083") \
.config("spark.driver.extraClassPath", "/usr/local/ignite/libs/*jar:/usr/local/ignite/libs/optional/ignite-spark/*jar:/usr/local/ignite/libs/ignite-spring/*jar:/usr/local/ignite/libs/ignite-indexing/*jar") \
.enableHiveSupport() \
.getOrCreate()
df = spark.sql("select * from hive_table")
configFile = os.environ['IGNITE_HOME'] + "config/default-config.xml"
df.write.format("ignite") \
.option("table", "TickData") \
.option("config", configFile) \
.save()
I have found the same issue from here, but the error still exists after adding the spark.driver.extraClassPath. Should I set any config or setting because I never modify /ignite/config/default-config.xml
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--
Alter configuration below as needed.
-->
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"/>
</beans>
My Spark config:
spark-defaults.conf
spark.master spark://pc1-vm1:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://pc1-vm1:9000/user/hdoop/log
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 4g
spark.executor.memory 6g
spark.executor.cores 2
spark.dynamicAllocation.maxExecutors 2
spark.ui.enabled true
spark.driver.extraClassPath /usr/local/ignite/libs/*jar:/usr/local/ignite/libs/optional/ignite-spark/*jar:/usr/local/ignite/libs/ignite-spring/*jar:/usr/local/ignite/libs/ignite-indexing/*jar
spark-env.sh
export PYSPARK_PYTHON=python3
export SCALA_HOME=/usr/lib/scala
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export SPARK_MASTER_HOST=pc1-vm1
export SPARK_WORKER_MEMORY=32G
export SPARK_DAEMON_MEMORY=4G
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
export HADOOP_CLASSPATH='hadoop classpath'
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_CLASSPATH
export HIVE_HOME=/home/hadoop/hive
export SPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-8.0.22.jar:$SPARK_CLASSPATH
These are the version I used:
- Hadoop v2.10.1
- Spark v2.3.4
- Hive v2.1.1
- Python v3.7.5
- openjdk-8-jre
- Ignite v2.7
Could anybody help me, please?
Solution 1:[1]
You need ignite JARs on the executor classpath, so spark.driver.extraClassPath should probably be spark.executor.extraClassPath, but you should rather use spark-submit --packages flag to add spark-ignite library
Alternatively, the docs mention that you can modify spark-env.sh to add Ignite libraries to SPARK_CLASSPATH (which you've not done)
https://ignite.apache.org/docs/latest/extensions-and-integrations/ignite-for-spark/installation
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | OneCricketeer |
