小编Dhi*_*raj的帖子

在本地计算机上运行Spark Streaming时出现"Connection Refused"错误

我知道有很多线程已经出现'spark streaming connection refused'问题.但其中大部分是在Linux或至少指向HDFS.我在Windows的本地笔记本电脑上运行它.

我正在运行一个非常简单的基本Spark流式独立应用程序,只是为了看看流的工作原理.这里没有做任何复杂的事情: -

import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf

object MyStream 
{
    def main(args:Array[String]) 
    {
        val sc = new StreamingContext(new SparkConf(),Seconds(10))
        val mystreamRDD = sc.socketTextStream("localhost",7777)
        mystreamRDD.print()
        sc.start()
        sc.awaitTermination()
    }
}
Run Code Online (Sandbox Code Playgroud)

我收到以下错误: -

2015-07-25 18:13:07 INFO  ReceiverSupervisorImpl:59 - Starting receiver
2015-07-25 18:13:07 INFO  ReceiverSupervisorImpl:59 - Called receiver onStart
2015-07-25 18:13:07 INFO  SocketReceiver:59 - Connecting to localhost:7777
2015-07-25 18:13:07 INFO  ReceiverTracker:59 - Registered receiver for      stream 0 from 192.168.19.1:11300
2015-07-25 18:13:08 WARN  ReceiverSupervisorImpl:92 - Restarting receiver     with delay 2000 …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-streaming

8
推荐指数
1
解决办法
7167
查看次数

RDD,分区和节点之间的关系

我一直在阅读关于RDD以及分区如何影响各种转换,以及一些转换如何影响分区本身.虽然我理解这一点,但我无法将它与大局相关联,因为它如何适应我们拥有多个节点的集群.

分区和节点之间是否存在一对一的对应关系?我的意思是,理想情况下每个节点是否有一个分区?如果没有,Spark如何确定特定RDD必须驻留在同一节点上的分区数?

更具体地说,我可以想到以下其中一项: -

1)同一节点上给定RDD的所有分区2)同一RDD的所有分区都可以驻留在不同的节点上(但拆分的基础是什么?)3)同一节点的分区分散在集群中,部分它们位于同一节点上,其中一些节点位于不同的节点上(同样,这种分布的基础是什么?)

有人可以解释一下,或者至少指出一些具体的链接来回答这个问题吗?

apache-spark rdd

7
推荐指数
1
解决办法
3506
查看次数

如何从任务中打印累加器变量(似乎"工作"而不调用值方法)?

我知道累加器变量从任务的角度来看是"只写",当它们在工作节点中执行时.我正在对此进行一些测试,我意识到我能够在任务中打印累加器值.

这里我在驱动程序中初始化累加器: -

scala> val accum  = sc.accumulator(123)
accum: org.apache.spark.Accumulator[Int] = 123
Run Code Online (Sandbox Code Playgroud)

然后我继续定义一个函数'foo': -

scala> def foo(pair:(String,String)) = { println(accum); pair }
foo: (pair: (String, String))(String, String)
Run Code Online (Sandbox Code Playgroud)

在这个函数中,我只是打印累加器,然后返回收到的同一对.

现在我有一个名为myrdd的RDD,其类型如下: -

scala> myrdd
res13: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[9] at map at <console>:21
Run Code Online (Sandbox Code Playgroud)

我现在在这个RDD上调用地图转换: -

myrdd.map(foo).collect
Run Code Online (Sandbox Code Playgroud)

"收集"行动正在应用于强制评估.所以这里实际发生的是,在执行期间,为RDD的每一行打印零(0).由于这个RDD有4个元素,它打印0次4次.由于动作'collect'在那里,它也会打印出最后的所有元素,但这并不是真正的重点.所以我有两个问题: -

  1. 从逻辑上讲,打印相当于阅读,因为只有当你可以阅读时,才能打印.那为什么允许这样做?如果我们试图在函数中"返回"累加器,为什么不会抛出异常会发生什么?
  2. 当我们在驱动程序中将其作为123启动时,为什么打印0作为累加器的值?

经过一些实验,我发现如果我改变函数定义来访问累加器对象的实际值属性(accum.value),然后按照已经描述的那样触发RDD动作,它确实抛出异常: -

scala> def foo(pair:(String,String)) = { println(accum.value); pair }
Run Code Online (Sandbox Code Playgroud)

RDD评估期间引起的例外: -

无法读取任务中的累加器值

所以我之前做的是尝试打印累加器对象本身.但问题仍然是为什么它打印0?因为在驱动程序级别,如果我发出我在函数定义中使用的相同命令,我确实得到值123: -

scala> println(accum)
123
Run Code Online (Sandbox Code Playgroud)

我没有必要说println(accum.value)才能正常工作.那么为什么只有当我在任务使用的函数中发出此命令时,它是否会打印0?

scala apache-spark rdd

7
推荐指数
1
解决办法
2875
查看次数

无法声明字符串类型累加器

我试图在Scala shell(驱动程序)中定义String类型的累加器变量,但我不断收到以下错误: -

scala> val myacc = sc.accumulator("Test")
<console>:21: error: could not find implicit value for parameter param: org.apache.spark.AccumulatorParam[String]
       val myacc = sc.accumulator("Test")
                                 ^
Run Code Online (Sandbox Code Playgroud)

对于Int或Double类型的累加器来说,这似乎没有问题.

谢谢

scala apache-spark rdd

6
推荐指数
1
解决办法
3106
查看次数

部署和供应 ADF 之间的区别

每当我在 ADF 中创建新管道并单击“部署”时,首先我都会看到“部署”消息,紧接着我会看到“供应”消息。有什么不同?

我们是否可以遇到它正在部署管道但不配置的情况?

谢谢

azure azure-data-factory

6
推荐指数
1
解决办法
2710
查看次数

TensorFlow仅限于神经网络吗?

TensorFlow是否仅设计用于实现神经网络?它可以用作通用的机器学习库-用于实施各种有监督的还是无监督的技术(朴素贝叶斯,决策树,k均值,SVM等)?无论我遇到的TensorFlow文献是什么,都通常是在谈论神经网络。TensorFlow可能基于图的体系结构使其适合用于神经网络。但是它也可以用作一般的机器学习框架吗?

tensorflow

6
推荐指数
1
解决办法
3872
查看次数

repartition()不影响RDD分区大小

我试图使用repartition()方法更改RDD的分区大小.RDD上的方法调用成功,但是当我使用RDD的partition.size属性显式检查分区大小时,我得到了它原来拥有的相同数量的分区: -

scala> rdd.partitions.size
res56: Int = 50

scala> rdd.repartition(10)
res57: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[19] at repartition at <console>:27
Run Code Online (Sandbox Code Playgroud)

在这个阶段,我执行一些像rdd.take(1)这样的动作只是为了强制评估,以防万一.然后我再次检查分区大小: -

scala> rdd.partitions.size
res58: Int = 50
Run Code Online (Sandbox Code Playgroud)

可以看出,它并没有改变.有人能解答原因吗?

apache-spark rdd

5
推荐指数
1
解决办法
5650
查看次数

为什么序列化持久化 RDD 比反序列化持久化 RDD 占用更少的内存

我读到当 RDD 通过某种序列化(无论是默认 Java 序列化还是 Kryo 序列化之类的东西)持久保存在内存中时,它在内存中占用的空间更少。我对序列化的理解是,它只是将内存中的Java对象转换为一系列位的一种方式,而反序列化实际上是将这些位作为对象带入内存的过程。所以我一直认为反序列化是带回来的将内存作为一系列位的对象。因此,将某些内容存储为反序列化数据的术语让我感到困惑。对我来说,存储(作为一系列位)是序列化,检索是反序列化。所以我真的无法想象将某些东西存储为反序列化数据意味着什么。正因为如此,我也无法理解为什么序列化表单在缓存中占用的空间更少。在我看来,反序列化不仅仅是倾倒对象并将它们取回,但我不知道是什么,并且非常想理解这一点。是不是因为序列化格式真的是某种压缩格式?和反序列化格式没有任何压缩?据我所知,在 Java 编程中,我从未遇到过将对象存储为反序列化格式的概念。

apache-spark rdd

5
推荐指数
1
解决办法
2246
查看次数

mapPartitions返回空数组

我有以下RDD有4个分区: -

val rdd=sc.parallelize(1 to 20,4)
Run Code Online (Sandbox Code Playgroud)

现在我尝试在这上面调用mapPartitions: -

scala> rdd.mapPartitions(x=> { println(x.size); x }).collect
5
5
5
5
res98: Array[Int] = Array()
Run Code Online (Sandbox Code Playgroud)

为什么它返回空数组?anonymoys函数只是返回它收到的相同迭代器,那么它是如何返回空数组的呢?有趣的是,如果我删除println语句,它确实返回非空数组: -

scala> rdd.mapPartitions(x=> { x }).collect
res101: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
Run Code Online (Sandbox Code Playgroud)

这个我不明白.为什么println(只是打印迭代器的大小)的存在会影响函数的最终结果?

apache-spark rdd

5
推荐指数
1
解决办法
1989
查看次数

DROP PARTITION 是否从 HIVE 中的外部表中删除数据?

HIVE 中的外部表按年、月和日进行分区。

那么以下查询是否会从此查询中引用的特定分区的外部表中删除数据?:-

ALTER TABLE MyTable DROP IF EXISTS PARTITION(year=2016,month=7,day=11);
Run Code Online (Sandbox Code Playgroud)

hive external-tables hive-partitions hiveddl

5
推荐指数
1
解决办法
2万
查看次数