Pra*_*ria 5 python apache-spark databricks delta-lake
我已经设置了 Amundsen,并且 UI 工作正常。我正在尝试运行其存储库中的示例中给出的示例 Delta Lake 加载程序。
"""
This is a example script for extracting Delta Lake Metadata Results
"""
from pyhocon import ConfigFactory
from pyspark.sql import SparkSession
from databuilder.extractor.delta_lake_metadata_extractor import DeltaLakeMetadataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.models.table_metadata import DESCRIPTION_NODE_LABEL
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
# NEO4J cluster endpoints
NEO4J_ENDPOINT = 'bolt://localhost:7687/'
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
cluster_key = 'my_delta_environment'
database = 'delta'
# Or set to empty to do all
schema_list = ['schema1', 'schema2']
# Or set to empty list if you don't have any schemas that you don't want to process
exclude_list = ['bad_schema']
def create_delta_lake_job_config():
tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = f'{tmp_folder}/nodes/'
relationship_files_folder = f'{tmp_folder}/relationships/'
job_config = ConfigFactory.from_dict({
f'extractor.delta_lake_table_metadata.{DeltaLakeMetadataExtractor.CLUSTER_KEY}': cluster_key,
f'extractor.delta_lake_table_metadata.{DeltaLakeMetadataExtractor.DATABASE_KEY}': database,
f'extractor.delta_lake_table_metadata.{DeltaLakeMetadataExtractor.SCHEMA_LIST_KEY}': schema_list,
f'extractor.delta_lake_table_metadata.{DeltaLakeMetadataExtractor.EXCLUDE_LIST_SCHEMAS_KEY}': exclude_list,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_CREATE_ONLY_NODES}': [DESCRIPTION_NODE_LABEL],
f'publisher.neo4j.job_publish_tag': 'some_unique_tag' # TO-DO unique tag must be added
})
return job_config
if __name__ == "__main__":
# This assumes you are running on a spark cluster (for example databricks cluster)
# that is configured with a hive metastore that
# has pointers to all of your delta tables
# Because of this, this code CANNOT run as a normal python operator on airflow.
spark = SparkSession.builder.appName("Amundsen Delta Lake Metadata Extraction").getOrCreate()
job_config = create_delta_lake_job_config()
dExtractor = DeltaLakeMetadataExtractor()
dExtractor.set_spark(spark)
job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=dExtractor, loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
job.launch()
Run Code Online (Sandbox Code Playgroud)
所以上面的代码是Amundsen Repository中给出的示例代码。
应该在cluster_key和数据库中提供什么来连接我的 Delta Lake?
当我运行代码时,我没有收到任何错误,但没有数据被摄取到 amundsen 中。
小智 2
Amundsen 的Delta Lake 元数据提取器使用 hive 元存储从名称查找 Delta 表的位置。您使用配置构建的是增量表的名称,该名称将传递到 Hive 以获取有关该表的元数据,特别是它所在的位置。
Amundsen 存储库中的此[问题响应]指出集群键是任意的,因为它不是用于查找表位置的 Hive 表名称的一部分。因此,假设您不想使用 Amundsen 将元数据收集范围限制到特定集群的能力,您可以将其设置为您想要的任何值。
其他值都是存储在 Hive 中的表名称的一部分,来自 Amundsen 的 Delta Lake 元数据提取器的相关代码行:
| 归档时间: |
|
| 查看次数: |
722 次 |
| 最近记录: |