'How to ingest Delta Lake MetaData into Amundsen Data Discovery Engine?
I have setup the Amundsen and the UI Works fine. I am trying to run the sample delta lake loader given in the examples in their repository.
"""
This is a example script for extracting Delta Lake Metadata Results
"""
from pyhocon import ConfigFactory
from pyspark.sql import SparkSession
from databuilder.extractor.delta_lake_metadata_extractor import DeltaLakeMetadataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.models.table_metadata import DESCRIPTION_NODE_LABEL
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
# NEO4J cluster endpoints
NEO4J_ENDPOINT = 'bolt://localhost:7687/'
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
cluster_key = 'my_delta_environment'
database = 'delta'
# Or set to empty to do all
schema_list = ['schema1', 'schema2']
# Or set to empty list if you don't have any schemas that you don't want to process
exclude_list = ['bad_schema']
def create_delta_lake_job_config():
tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = f'{tmp_folder}/nodes/'
relationship_files_folder = f'{tmp_folder}/relationships/'
job_config = ConfigFactory.from_dict({
f'extractor.delta_lake_table_metadata.{DeltaLakeMetadataExtractor.CLUSTER_KEY}': cluster_key,
f'extractor.delta_lake_table_metadata.{DeltaLakeMetadataExtractor.DATABASE_KEY}': database,
f'extractor.delta_lake_table_metadata.{DeltaLakeMetadataExtractor.SCHEMA_LIST_KEY}': schema_list,
f'extractor.delta_lake_table_metadata.{DeltaLakeMetadataExtractor.EXCLUDE_LIST_SCHEMAS_KEY}': exclude_list,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_CREATE_ONLY_NODES}': [DESCRIPTION_NODE_LABEL],
f'publisher.neo4j.job_publish_tag': 'some_unique_tag' # TO-DO unique tag must be added
})
return job_config
if __name__ == "__main__":
# This assumes you are running on a spark cluster (for example databricks cluster)
# that is configured with a hive metastore that
# has pointers to all of your delta tables
# Because of this, this code CANNOT run as a normal python operator on airflow.
spark = SparkSession.builder.appName("Amundsen Delta Lake Metadata Extraction").getOrCreate()
job_config = create_delta_lake_job_config()
dExtractor = DeltaLakeMetadataExtractor()
dExtractor.set_spark(spark)
job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=dExtractor, loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
job.launch()
So the above code is the example code given in the Amundsen Repository.
What should be given at cluster_key and database to connect with my delta lake?
When I run the code I get no errors but no data is ingested into amundsen.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
