在 PySpark 环境中创建缓存的最佳方式

Sar*_*uja 4 caching cloudant apache-spark pyspark

我正在使用 Spark Streaming 创建一个系统来丰富来自 cloudant 数据库的传入数据。例子 -

Incoming Message: {"id" : 123}
Outgoing Message: {"id" : 123, "data": "xxxxxxxxxxxxxxxxxxx"}
Run Code Online (Sandbox Code Playgroud)

我的驱动程序类的代码如下:

Incoming Message: {"id" : 123}
Outgoing Message: {"id" : 123, "data": "xxxxxxxxxxxxxxxxxxx"}
Run Code Online (Sandbox Code Playgroud)

Enrichment Job 的代码如下: class EnrichmentJob:

from Sample.Job import EnrichmentJob
from Sample.Job import FunctionJob
import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession

from kafka import KafkaConsumer, KafkaProducer
import json

class SampleFramework():

    def __init__(self):
        pass

    @staticmethod
    def messageHandler(m):
        return json.loads(m.message)

    @staticmethod
    def processData(rdd):

        if (rdd.isEmpty()):
            print("RDD is Empty")
            return

        # Expand
        expanded_rdd = rdd.mapPartitions(EnrichmentJob.enrich)

        # Score
        scored_rdd = expanded_rdd.map(FunctionJob.function)

        # Publish RDD


    def run(self, ssc):

        self.ssc = ssc

        directKafkaStream = KafkaUtils.createDirectStream(self.ssc, QUEUENAME, \
                                                          {"metadata.broker.list": META, 
                                                          "bootstrap.servers": SERVER}, \
                                                          messageHandler= SampleFramework.messageHandler)

        directKafkaStream.foreachRDD(SampleFramework.processData)

        ssc.start()
        ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

我的问题是 - 有没有办法维护[1]“所有工作人员都可以访问的主内存上的全局缓存”或[2]“每个工作人员上的本地缓存,以便它们保留在 foreachRDD 设置中”?

我已经探索过以下内容 -

  1. 广播变量 - 这里我们采用 [1] 方式。据我了解,它们是只读且不可变的。我已经查看了此参考文献,但它引用了取消持久化/持久化广播变量的示例。这是一个好的做法吗?

  2. 静态变量 - 这里我们采用 [2] 方式。所引用的类(在本例中为“Enricher”)以静态变量字典的形式维护缓存。但事实证明,ForEachRDD 函数为每个传入的 RDD 生成一个全新的进程,这会删除之前启动的静态变量。这是上面编码的代码。

我现在有两种可能的解决方案 -

  1. 在文件系统上维护离线缓存。
  2. 在我的驱动程序节点上完成此丰富任务的整个计算。这将导致整个数据最终到达驱动程序并在那里维护。缓存对象将作为映射函数的参数发送到丰富作业。

显然,第一个看起来比第二个更好,但我想得出结论,在承诺之前,这两个是唯一的方法。任何指示将不胜感激!

zer*_*323 5

有没有办法维护[1]“所有工作人员都可以访问的主内存上的全局缓存”

不。不存在可供所有工作人员访问的“主存储器”。每个worker运行在一个单独的进程中,并通过套接字与外部世界进行通信。更不用说非本地模式下不同物理节点之间的分离。

有一些技术可以用来通过内存映射数据实现工作范围缓存(使用 SQLite 是最简单的一种),但需要一些额外的努力来实现正确的方法(避免冲突等)。

或者 [2]“每个工作线程上的本地缓存,以便它们保留在 foreachRDD 设置中”?

您可以使用范围仅限于各个工作进程的标准缓存技术。根据配置(静态与动态资源分配) ,spark.python.worker.reuse它可能会也可能不会在多个任务和批次之间保留。

考虑以下简化示例:

  • map_param.py

    from pyspark import AccumulatorParam
    from collections import Counter
    
    class CounterParam(AccumulatorParam):
        def zero(self, v: Counter) -> Counter:
            return Counter()
    
        def addInPlace(self, acc1: Counter, acc2: Counter) -> Counter:
            acc1.update(acc2)
            return acc1
    
    Run Code Online (Sandbox Code Playgroud)
  • my_utils.py

    from pyspark import Accumulator
    from typing import Hashable
    from collections import Counter
    
    # Dummy cache. In production I would use functools.lru_cache 
    # but it is a bit more painful to show with accumulator
    cached = {} 
    
    def f_cached(x: Hashable, counter: Accumulator) -> Hashable:
        if cached.get(x) is None:
            cached[x] = True
            counter.add(Counter([x]))
        return x
    
    
    def f_uncached(x: Hashable, counter: Accumulator) -> Hashable:
        counter.add(Counter([x]))
        return x
    
    Run Code Online (Sandbox Code Playgroud)
  • main.py

    from pyspark.streaming import StreamingContext
    from pyspark import SparkContext
    
    from counter_param import CounterParam
    import my_utils
    
    from collections import Counter
    
    def main():
        sc = SparkContext("local[1]")
        ssc = StreamingContext(sc, 5)
    
        cnt_cached = sc.accumulator(Counter(), CounterParam())
        cnt_uncached = sc.accumulator(Counter(), CounterParam())
    
        stream = ssc.queueStream([
            # Use single partition to show cache in work
            sc.parallelize(data, 1) for data in
            [[1, 2, 3], [1, 2, 5], [1, 3, 5]]
        ])
    
        stream.foreachRDD(lambda rdd: rdd.foreach(
            lambda x: my_utils.f_cached(x, cnt_cached)))
        stream.foreachRDD(lambda rdd: rdd.foreach(
            lambda x: my_utils.f_uncached(x, cnt_uncached)))
    
        ssc.start()
        ssc.awaitTerminationOrTimeout(15)
        ssc.stop(stopGraceFully=True)
    
        print("Counter cached {0}".format(cnt_cached.value))
        print("Counter uncached {0}".format(cnt_uncached.value))
    
    if __name__ == "__main__":
        main()
    
    Run Code Online (Sandbox Code Playgroud)

运行示例:

bin/spark-submit main.py
Run Code Online (Sandbox Code Playgroud)
Counter cached Counter({1: 1, 2: 1, 3: 1, 5: 1})
Counter uncached Counter({1: 3, 2: 2, 3: 2, 5: 2})
Run Code Online (Sandbox Code Playgroud)

正如你所看到的,我们得到了预期的结果:

  • 对于“缓存”对象,每个工作进程(分区)的每个唯一键仅更新一次累加器。
  • 对于未缓存的对象,每次 key 出现时累加器都会更新。