小编Sar*_*uja的帖子

在 PySpark 环境中创建缓存的最佳方式

我正在使用 Spark Streaming 创建一个系统来丰富来自 cloudant 数据库的传入数据。例子 -

Incoming Message: {"id" : 123}
Outgoing Message: {"id" : 123, "data": "xxxxxxxxxxxxxxxxxxx"}
Run Code Online (Sandbox Code Playgroud)

我的驱动程序类的代码如下:

Incoming Message: {"id" : 123}
Outgoing Message: {"id" : 123, "data": "xxxxxxxxxxxxxxxxxxx"}
Run Code Online (Sandbox Code Playgroud)

Enrichment Job 的代码如下: class EnrichmentJob:

from Sample.Job import EnrichmentJob
from Sample.Job import FunctionJob
import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession

from kafka import KafkaConsumer, KafkaProducer
import json

class SampleFramework():

    def __init__(self):
        pass

    @staticmethod
    def messageHandler(m): …
Run Code Online (Sandbox Code Playgroud)

caching cloudant apache-spark pyspark

4
推荐指数
1
解决办法
2184
查看次数

标签 统计

apache-spark ×1

caching ×1

cloudant ×1

pyspark ×1