标签: dstream

对于DStream中的每个RDD,如何将其转换为数组或其他典型的Java数据类型?

我想将DStream转换为数组,列表等,然后我可以将其转换为json并在端点上提供它.我正在使用apache spark,注入twitter数据.如何在Dstream上执行此操作statuses?除了,我似乎无法获得任何工作print().

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import TutorialHelper._
object Tutorial {
  def main(args: Array[String]) {

    // Location of the Spark directory 
    val sparkHome = "/opt/spark"

    // URL of the Spark cluster
    val sparkUrl = "local[8]"

    // Location of the required JAR files 
    val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar"

    // HDFS directory for checkpointing
    val checkpointDir = "/tmp" 

    // Configure Twitter credentials using twitter.txt
    TutorialHelper.configureTwitterCredentials()

    val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-streaming dstream

6
推荐指数
2
解决办法
2万
查看次数

集群中作业运行期间的 Spark 流错误(纱线资源管理器)

我面临以下错误:
我编写了一个基于 Spark 流 ( Dstream )的应用程序来提取来自 PubSub 的消息。不幸的是,我在执行这项工作时遇到了错误。实际上,我正在使用由 4 个节点组成的集群来执行 spark 作业。

在没有任何特定错误的情况下运行作业 10 分钟后,我永久收到以下错误:

错误 org.apache.spark.streaming.CheckpointWriter:
无法将检查点任务提交给线程池执行程序 java.util.concurrent.RejectedExecutionException:任务 org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@68395dc9
从 java.util.concurrent被拒绝.ThreadPoolExecutor@1a1acc25
[正在运行,池大小 = 1,活动线程 = 1,排队任务 = 1000,已完成任务 = 412]

hadoop-yarn apache-spark spark-streaming dstream

6
推荐指数
0
解决办法
251
查看次数

如何在Spark Streaming中跨多个批处理间隔传输数据流

我正在使用Apache Spark Streaming 1.6.1编写一个Java应用程序,它连接两个Key/Value数据流并将输出写入HDFS.这两个数据流包含K/V字符串,并使用textFileStream()定期在HDFS中从Spark中摄取.

两个数据流不同步,这意味着在时间t0在stream1中的一些键可以在时间t1出现在stream2中,反之亦然.因此,我的目标是加入两个流并计算"剩余"密钥,这应该考虑在下一个批处理间隔中的连接操作.

为了更好地澄清这一点,请查看以下算法:

variables:
stream1 = <String, String> input stream at time t1
stream2 = <String, String> input stream at time t1
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0

operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming dstream

5
推荐指数
1
解决办法
1159
查看次数

无法使用pyspark从json dstream创建数据框

我正在尝试从dstream中的json创建一个数据框,但是下面的代码似乎无法正确显示该数据框-

import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
def getSqlContextInstance(sparkContext):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
    return globals()['sqlContextSingletonInstance']

if __name__ == "__main__":
    if len(sys.argv) != 3:
        raise IOError("Invalid usage; the correct format is:\nquadrant_count.py <hostname> <port>")

# Initialize a SparkContext with a name
spc = SparkContext(appName="jsonread")
sqlContext = SQLContext(spc)
# Create a StreamingContext with a batch interval of 2 seconds
stc = StreamingContext(spc, 2)
# Checkpointing feature
stc.checkpoint("checkpoint")
# Creating …
Run Code Online (Sandbox Code Playgroud)

python json apache-spark dstream pyspark

5
推荐指数
0
解决办法
901
查看次数

Spark DStream的foreachDD函数中RDD的并发转换

在下面的代码中,似乎函数fn1和fn2以顺序方式应用于inRDD,正如我在Spark Web UI的Stages部分中看到的那样.

 DstreamRDD1.foreachRDD(new VoidFunction<JavaRDD<String>>()
 { 
     public void call(JavaRDD<String> inRDD)
        {
          inRDD.foreach(fn1)
          inRDD.foreach(fn2)
        }
 }
Run Code Online (Sandbox Code Playgroud)

流媒体作业以这种方式运行时有何不同.以下函数是否在输入Dstream上并行运行?

DStreamRDD1.foreachRDD(fn1)
DStreamRDD2.foreachRDD(fn2)
Run Code Online (Sandbox Code Playgroud)

java apache-spark spark-streaming rdd dstream

5
推荐指数
1
解决办法
228
查看次数

pyspark 中的转换 DStream 在调用 pprint 时出错

我正在通过 PySpark 探索 Spark Streaming,当我尝试将transform函数与take.

我可以成功地使用sortByDStreamtransformpprint结果。

author_counts_sorted_dstream = author_counts_dstream.transform\
  (lambda foo:foo\
   .sortBy(lambda x:x[0].lower())\
   .sortBy(lambda x:x[1],ascending=False))
author_counts_sorted_dstream.pprint()
Run Code Online (Sandbox Code Playgroud)

但是如果我使用take以下相同的模式并尝试pprint它:

top_five = author_counts_sorted_dstream.transform\
  (lambda rdd:rdd.take(5))
top_five.pprint()
Run Code Online (Sandbox Code Playgroud)

工作失败

Py4JJavaError: An error occurred while calling o25.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call
    return r._jrdd
AttributeError: 'list' object has no attribute '_jrdd'
Run Code Online (Sandbox Code Playgroud)

您可以在此处查看笔记本中的完整代码和输出。

我究竟做错了什么?

apache-spark spark-streaming dstream pyspark

4
推荐指数
1
解决办法
3261
查看次数

以编程方式在apache spark中创建dstream

我正在围绕Apache Spark Streaming编写一些自包含的集成测试.我想测试我的代码可以在我的模拟测试数据中摄取各种边缘情况.当我使用常规RDD(不是流式传输)时这样做.我可以使用我的内联数据并在其上调用"parallelize"将其转换为spark RDD.但是,我找不到这种创建目的地的方法.理想情况下,我想偶尔调用一些"推"功能,让魔法神奇地出现在我的dstream中.ATM我正在使用Apache Kafka这样做:我创建了一个临时队列,然后我写信给它.但这似乎有点矫枉过正.我宁愿直接从我的测试数据创建测试dstream,而不必使用Kafka作为调解器.

testing apache-spark dstream

0
推荐指数
1
解决办法
2127
查看次数