Ada*_*ury 5 apache-spark spark-streaming
我有两个 kafka 流,其中包含两个并行操作的结果,我需要一种方法来组合两个流,以便我可以在单个 spark 变换中处理结果。这可能吗?(下图)
Stream 1 {id:1,result1:True}
Stream 2 {id:1,result2:False}
JOIN(Stream 1, Stream 2, On "id") -> Output Stream {id:1,result1:True,result2:False}
Run Code Online (Sandbox Code Playgroud)
当前无效的代码:
kvs1 = KafkaUtils.createStream(sparkstreamingcontext, ZOOKEEPER, NAME+"_stream", {"test_join_1": 1})
kvs2 = KafkaUtils.createStream(sparkstreamingcontext, ZOOKEEPER, NAME+"_stream", {"test_join_2": 1})
messages_RDDstream1 = kvs1.map(lambda x: x[1])
messages_RDDstream2 = kvs2.map(lambda x: x[1])
messages_RDDstream_Final = messages_RDDstream1.join(messages_RDDstream2)
Run Code Online (Sandbox Code Playgroud)
当我将两个示例 json 传递给具有相同 ID 字段的每个 Kafka 队列时,我的最终 RDD 流中没有返回任何内容。我想我错过了将我的 Kafka JSON 字符串消息转换为元组的阶段?
我还尝试了以下方法:
kvs1.map(lambda (key, value): json.loads(value))
Run Code Online (Sandbox Code Playgroud)
和
kvs1.map(lambda x: json.loads(x))
Run Code Online (Sandbox Code Playgroud)
无济于事
干杯
亚当
简单查找 Spark 的文档就会给你答案。
您可以使用该join
操作。
加入(otherStream,[numTasks]):
当调用两个 (K, V) 和 (K, W) 对 DStream 时,返回一个新的 (K, (V, W)) 对 DStream,其中包含每个键的所有元素对。
例如 :val streamJoined = stream1.join(stream2)
归档时间: |
|
查看次数: |
2989 次 |
最近记录: |