Hac*_*ode 6 cassandra datastax apache-spark spark-streaming pyspark
我有以下火花工作:
from __future__ import print_function
import os
import sys
import time
from random import random
from operator import add
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext, Row
from pyspark.streaming import StreamingContext
from pyspark_cassandra import streaming,CassandraSparkContext
if __name__ == "__main__":
conf = SparkConf().setAppName("PySpark Cassandra Test")
sc = CassandraSparkContext(conf=conf)
stream = StreamingContext(sc, 2)
rdd=sc.cassandraTable("keyspace2","users").collect()
#print rdd
stream.start()
stream.awaitTermination()
sc.stop()
Run Code Online (Sandbox Code Playgroud)
当我运行它时,它给我以下错误:
ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: \
No output operations registered, so nothing to execute
Run Code Online (Sandbox Code Playgroud)
我运行的shell脚本:
./bin/spark-submit --packages TargetHolding:pyspark-cassandra:0.2.4 example
s/src/main/python/test/reading-cassandra.py
Run Code Online (Sandbox Code Playgroud)
比较火花流与kafka,我从上面的代码中遗漏了这一行:
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", {'topic':1})
Run Code Online (Sandbox Code Playgroud)
在我实际使用的地方,createStream但对于cassandra,我在文档上看不到这样的东西.如何启动spark streaming和cassandra之间的流媒体?
版本:
Cassandra v2.1.12
Spark v1.4.1
Scala 2.10
Run Code Online (Sandbox Code Playgroud)
要从 Cassandra 表创建 DStream,您可以使用ConstantInputDStream提供从 Cassandra 表创建的 RDD 作为输入。这将导致 RDD 在每个 DStream 间隔上具体化。
请注意,大型表或大小持续增长的表会对流作业的性能产生负面影响。
另请参阅:使用 Spark Streaming 从 Cassandra 读取 示例。
| 归档时间: |
|
| 查看次数: |
1077 次 |
| 最近记录: |