有没有办法在 Kafka Streaming 的“foreachBatch”函数中传递附加/额外参数?

Met*_*ata 1 apache-kafka apache-spark spark-streaming

我正在尝试将流数据帧与配置单元表连接起来,并将生成的数据帧插入到另一个 Kafka 主题中。下面是我实现的代码,它按照要求工作。

def write_stream_batches(kafka_df: DataFrame,table_config):
    table_config = state_config
    kafka_df.writeStream \
    .format('kafka') \
    .foreachBatch(join_kafka_streams_denorm) \
    .option('checkpointLocation', table_config['checkpoint_location']) \
    .start() \
    .awaitTermination()

def join_kafka_streams_denorm(kafka_df, batch_id):
    try:
        table_config = state_config
        kafka_config = kafkaconfig

        filters = ata_filter(kafka_df=kafka_df)
        main_df = spark.sql(f'select * from db.table where {filters}')

        joined_df = join_remove_duplicate_col(kafka_df=kafka_df, denorm=main_df, table_config=table_config)
        push_to_kafka(joined_df, kafka_config, table_config, 'state')
    except Exception as error:
        print(f'Join failed with the exception: {error}')
        traceback.print_exc()
        print('Stopping the application')
        sys.exit(1)
Run Code Online (Sandbox Code Playgroud)

该方法write_stream_batches正在从 kafka 接收流数据帧。我正在将此主题数据合并到配置单元表中,并且我的表配置在从 config.py 文件导入的字典中,下面是该行。

table_config = state_config
Run Code Online (Sandbox Code Playgroud)

这里的问题是给出检查点配置,我在 write_stream_batches 中导入 state_config ,这很好。

但是,如果我尝试table_config作为参数foreachBatch与代码一起传递join_kafka_streams_denorm,则不起作用。所以我必须在方法中再次导入它join_kafka_streams_denorm并使用那里的其余配置。我尝试这样做遇到了错误。

def write_stream_batches(kafka_df: DataFrame,table_config):
    table_config = state_config
    kafka_df.writeStream \
    .format('kafka') \
    .foreachBatch(join_kafka_streams_denorm, table_config) \
    .option('checkpointLocation', table_config['checkpoint_location']) \
    .start() \
    .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

但该语法不正确,因为table_config被视为批次 ID。

foreachBatch那么任何人都可以让我知道在处理流数据帧时是否有任何方法可以传递额外的参数。

rlu*_*uta 5

您不能传递任意参数foreachBatch,但您可以将其重新设计join_kafka_streams_denorm为高阶函数,该函数可以接受您的配置并返回实际的处理函数:

def join_kafka_streams_denorm(table_config):
    def _do_work(kafka_df, batch_id):
        try:
            table_config = state_config
            kafka_config = kafkaconfig

            filters = ata_filter(kafka_df=kafka_df)
            main_df = spark.sql(f'select * from db.table where {filters}')

            joined_df = join_remove_duplicate_col(kafka_df=kafka_df, denorm=main_df, table_config=table_config)
            push_to_kafka(joined_df, kafka_config, table_config, 'state')
        except Exception as error:
            print(f'Join failed with the exception: {error}')
            traceback.print_exc()
            print('Stopping the application')
            sys.exit(1)
    return _do_work
Run Code Online (Sandbox Code Playgroud)

您可以使用这样的函数:

def write_stream_batches(kafka_df: DataFrame,table_config):
    table_config = state_config
    kafka_df.writeStream \
    .format('kafka') \
    .foreachBatch(join_kafka_streams_denorm(table_config)) \
    .option('checkpointLocation', table_config['checkpoint_location']) \
    .start() \
    .awaitTermination()
Run Code Online (Sandbox Code Playgroud)