'How to save pyspark dataframe from hive into iginte cache?

I would like to save my pyspark dataframe into ignite cache following the approach from this blog, but some error happened. Error message is:

Traceback (most recent call last):
  File "test_ignite.py", line 19, in <module>
    .option("config", configFile) \
  File "/home/hdoop/.local/lib/python3.6/site-packages/pyspark/sql/readwriter.py", line 738, in save
    self._jwrite.save()
  File "/home/hdoop/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1322, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/hdoop/.local/lib/python3.6/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/home/hdoop/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o42.save.
: java.lang.ClassNotFoundException: 
Failed to find data source: ignite. Please find packages at
http://spark.apache.org/third-party-projects.html
       
        at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:443)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:670)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
        at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:852)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: ignite.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:656)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:656)
        at scala.util.Failure.orElse(Try.scala:224)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:656)
        ... 16 more

These are my steps:

  1. I executed a Python script. python3 test_ignite.py
  2. Created a SparkSession and connected to hive.
  3. Tried to get a Pyspark dataframe from hive.
  4. Saved it to the ignite cache. (error occurred)

My test_ignite.py code is:

import os
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyignite import Client

spark = SparkSession \
.builder \
.appName("consumer_hive") \
.config("hive.metastore.uris", "thrift://localhost:9083") \
.config("spark.driver.extraClassPath", "/usr/local/ignite/libs/*jar:/usr/local/ignite/libs/optional/ignite-spark/*jar:/usr/local/ignite/libs/ignite-spring/*jar:/usr/local/ignite/libs/ignite-indexing/*jar") \
.enableHiveSupport() \
.getOrCreate()

df = spark.sql("select * from hive_table")

configFile = os.environ['IGNITE_HOME'] + "config/default-config.xml"

df.write.format("ignite") \
            .option("table", "TickData") \
            .option("config", configFile) \
            .save()

I have found the same issue from here, but the error still exists after adding the spark.driver.extraClassPath. Should I set any config or setting because I never modify /ignite/config/default-config.xml

<?xml version="1.0" encoding="UTF-8"?>
  
<!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!--
        Alter configuration below as needed.
    -->
    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"/>
</beans>

My Spark config:

spark-defaults.conf

spark.master spark://pc1-vm1:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://pc1-vm1:9000/user/hdoop/log
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 4g
spark.executor.memory 6g
spark.executor.cores 2
spark.dynamicAllocation.maxExecutors 2
spark.ui.enabled true


spark.driver.extraClassPath /usr/local/ignite/libs/*jar:/usr/local/ignite/libs/optional/ignite-spark/*jar:/usr/local/ignite/libs/ignite-spring/*jar:/usr/local/ignite/libs/ignite-indexing/*jar

spark-env.sh

export PYSPARK_PYTHON=python3
export SCALA_HOME=/usr/lib/scala
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export SPARK_MASTER_HOST=pc1-vm1
export SPARK_WORKER_MEMORY=32G
export SPARK_DAEMON_MEMORY=4G
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
export HADOOP_CLASSPATH='hadoop classpath'
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_CLASSPATH
export HIVE_HOME=/home/hadoop/hive 
export SPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-8.0.22.jar:$SPARK_CLASSPATH

These are the version I used:

- Hadoop v2.10.1
- Spark v2.3.4
- Hive v2.1.1
- Python v3.7.5
- openjdk-8-jre
- Ignite v2.7

Could anybody help me, please?



Solution 1:[1]

You need ignite JARs on the executor classpath, so spark.driver.extraClassPath should probably be spark.executor.extraClassPath, but you should rather use spark-submit --packages flag to add spark-ignite library

Alternatively, the docs mention that you can modify spark-env.sh to add Ignite libraries to SPARK_CLASSPATH (which you've not done)

https://ignite.apache.org/docs/latest/extensions-and-integrations/ignite-for-spark/installation

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 OneCricketeer