我正在使用 Spark Streaming 创建一个系统来丰富来自 cloudant 数据库的传入数据。例子 -
Incoming Message: {"id" : 123}
Outgoing Message: {"id" : 123, "data": "xxxxxxxxxxxxxxxxxxx"}
Run Code Online (Sandbox Code Playgroud)
我的驱动程序类的代码如下:
Incoming Message: {"id" : 123}
Outgoing Message: {"id" : 123, "data": "xxxxxxxxxxxxxxxxxxx"}
Run Code Online (Sandbox Code Playgroud)
Enrichment Job 的代码如下: class EnrichmentJob:
from Sample.Job import EnrichmentJob
from Sample.Job import FunctionJob
import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from kafka import KafkaConsumer, KafkaProducer
import json
class SampleFramework():
def __init__(self):
pass
@staticmethod
def messageHandler(m): …Run Code Online (Sandbox Code Playgroud)