我想将DStream转换为数组,列表等,然后我可以将其转换为json并在端点上提供它.我正在使用apache spark,注入twitter数据.如何在Dstream上执行此操作statuses
?除了,我似乎无法获得任何工作print()
.
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import TutorialHelper._
object Tutorial {
def main(args: Array[String]) {
// Location of the Spark directory
val sparkHome = "/opt/spark"
// URL of the Spark cluster
val sparkUrl = "local[8]"
// Location of the required JAR files
val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar"
// HDFS directory for checkpointing
val checkpointDir = "/tmp"
// Configure Twitter credentials using twitter.txt
TutorialHelper.configureTwitterCredentials()
val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, …
Run Code Online (Sandbox Code Playgroud) 我面临以下错误:
我编写了一个基于 Spark 流 ( Dstream )的应用程序来提取来自 PubSub 的消息。不幸的是,我在执行这项工作时遇到了错误。实际上,我正在使用由 4 个节点组成的集群来执行 spark 作业。
在没有任何特定错误的情况下运行作业 10 分钟后,我永久收到以下错误:
错误 org.apache.spark.streaming.CheckpointWriter:
无法将检查点任务提交给线程池执行程序 java.util.concurrent.RejectedExecutionException:任务 org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@68395dc9
从 java.util.concurrent被拒绝.ThreadPoolExecutor@1a1acc25
[正在运行,池大小 = 1,活动线程 = 1,排队任务 = 1000,已完成任务 = 412]
我正在使用Apache Spark Streaming 1.6.1编写一个Java应用程序,它连接两个Key/Value数据流并将输出写入HDFS.这两个数据流包含K/V字符串,并使用textFileStream()定期在HDFS中从Spark中摄取.
两个数据流不同步,这意味着在时间t0在stream1中的一些键可以在时间t1出现在stream2中,反之亦然.因此,我的目标是加入两个流并计算"剩余"密钥,这应该考虑在下一个批处理间隔中的连接操作.
为了更好地澄清这一点,请查看以下算法:
variables:
stream1 = <String, String> input stream at time t1
stream2 = <String, String> input stream at time t1
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0
operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of …
Run Code Online (Sandbox Code Playgroud) 我正在尝试从dstream中的json创建一个数据框,但是下面的代码似乎无法正确显示该数据框-
import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']
if __name__ == "__main__":
if len(sys.argv) != 3:
raise IOError("Invalid usage; the correct format is:\nquadrant_count.py <hostname> <port>")
# Initialize a SparkContext with a name
spc = SparkContext(appName="jsonread")
sqlContext = SQLContext(spc)
# Create a StreamingContext with a batch interval of 2 seconds
stc = StreamingContext(spc, 2)
# Checkpointing feature
stc.checkpoint("checkpoint")
# Creating …
Run Code Online (Sandbox Code Playgroud) 在下面的代码中,似乎函数fn1和fn2以顺序方式应用于inRDD,正如我在Spark Web UI的Stages部分中看到的那样.
DstreamRDD1.foreachRDD(new VoidFunction<JavaRDD<String>>()
{
public void call(JavaRDD<String> inRDD)
{
inRDD.foreach(fn1)
inRDD.foreach(fn2)
}
}
Run Code Online (Sandbox Code Playgroud)
流媒体作业以这种方式运行时有何不同.以下函数是否在输入Dstream上并行运行?
DStreamRDD1.foreachRDD(fn1)
DStreamRDD2.foreachRDD(fn2)
Run Code Online (Sandbox Code Playgroud) 我正在通过 PySpark 探索 Spark Streaming,当我尝试将transform
函数与take
.
我可以成功地使用sortBy
过DStream
孔transform
和pprint
结果。
author_counts_sorted_dstream = author_counts_dstream.transform\
(lambda foo:foo\
.sortBy(lambda x:x[0].lower())\
.sortBy(lambda x:x[1],ascending=False))
author_counts_sorted_dstream.pprint()
Run Code Online (Sandbox Code Playgroud)
但是如果我使用take
以下相同的模式并尝试pprint
它:
top_five = author_counts_sorted_dstream.transform\
(lambda rdd:rdd.take(5))
top_five.pprint()
Run Code Online (Sandbox Code Playgroud)
工作失败
Run Code Online (Sandbox Code Playgroud)Py4JJavaError: An error occurred while calling o25.awaitTermination. : org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call return r._jrdd AttributeError: 'list' object has no attribute '_jrdd'
您可以在此处查看笔记本中的完整代码和输出。
我究竟做错了什么?
我正在围绕Apache Spark Streaming编写一些自包含的集成测试.我想测试我的代码可以在我的模拟测试数据中摄取各种边缘情况.当我使用常规RDD(不是流式传输)时这样做.我可以使用我的内联数据并在其上调用"parallelize"将其转换为spark RDD.但是,我找不到这种创建目的地的方法.理想情况下,我想偶尔调用一些"推"功能,让魔法神奇地出现在我的dstream中.ATM我正在使用Apache Kafka这样做:我创建了一个临时队列,然后我写信给它.但这似乎有点矫枉过正.我宁愿直接从我的测试数据创建测试dstream,而不必使用Kafka作为调解器.
apache-spark ×7
dstream ×7
pyspark ×2
hadoop-yarn ×1
java ×1
json ×1
python ×1
rdd ×1
scala ×1
testing ×1