'Export Json string to CSV from Teradata database using pyspark

I am trying to create a read the data from teradata database by creating a dataframe and writing it to my local system using Dataframe.write method.

However, I am not able to read json string from teradata table. What could be the possible solution.

Code :

    from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import time

appName = "Pyspark Teradata"
master = "local"
conf = SparkConf()  # create the configuration
conf.set("spark.repl.local.jars", "<path_to_terajdbc4.jar>")
conf.set("spark.executor.extraClassPath", "<path_to_terajdbc4.jar>")
conf.set("spark.driver.extraClassPath", "<path_to_terajdbc4.jar>")

# Create Spark session
spark = SparkSession.builder \
    .config(conf=conf) \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

driver = 'com.teradata.jdbc.TeraDriver'

# Define the function to load data from Teradata

a=time.time()
def load_data(driver, jdbc_url, sql, user, password):
    return spark.read \
        .format('jdbc') \
        .option('multiLine','true')\
        .option('url', jdbc_url) \
        .option('encoding','UTF-8')\
        .option('dbtable', '({sql}) as src'.format(sql=sql)) \
        .option('user', user) \
        .option('password', password) \
        .option('driver', driver) \
        .option('STRICT_NAMES', 'OFF') \
        .load()

sql = "Select * from AdventureWorksDW.my_table_2"
url = "jdbc:teradata://192.168.xx.xx"
user = "dbc"
password = "dbc"

df_td = load_data(driver,url,sql,user,password)
df_td.show()

b=time.time()
print("Time taken", b-a)

Error that I am getting :

    py4j.protocol.Py4JJavaError: An error occurred while calling o50.load.
: java.sql.SQLException: Unsupported type OTHER
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getCatalystType(JdbcUtils.scala:247)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$getSchema$1(JdbcUtils.scala:312)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:312)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:63)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:226)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)

Source Data in Teradata:

enter image description here

How to output json datatype data into csv file using pyspark?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source