标签: spark-streaming

如何在Apache Spark应用程序中优化shuffle溢出

我正在运行带有2名工作人员的Spark流应用程序.应用程序具有联接和联合操作.

所有批次都已成功完成,但注意到随机溢出指标与输入数据大小或输出数据大小不一致(溢出内存超过20次).

请在下图中找到火花阶段的详细信息: 在此输入图像描述

经过对此的研究,发现了

当没有足够的内存用于随机数据时,会发生随机溢出.

Shuffle spill (memory) - 溢出时内存中数据的反序列化形式的大小

shuffle spill (disk) - 溢出后磁盘上数据序列化形式的大小

由于反序列化数据比序列化数据占用更多空间.所以,Shuffle溢出(内存)更多.

注意到这个溢出内存大小非常大,输入数据很大.

我的疑问是:

这种溢出是否会对性能产生很大影响?

如何优化内存和磁盘的溢出?

是否有可以减少/控制这种巨大溢出的Spark Properties?

apache-spark spark-streaming apache-spark-1.4

39
推荐指数
2
解决办法
2万
查看次数

build.sbt:如何添加spark依赖项

你好我想下载spark-core,spark-streaming,twitter4j,和spark-streaming-twitter下面的build.sbt文件:

name := "hello"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.4.1"

libraryDependencies ++= Seq(
  "org.twitter4j" % "twitter4j-core" % "3.0.3",
  "org.twitter4j" % "twitter4j-stream" % "3.0.3"
)

libraryDependencies += "org.apache.spark" % "spark-streaming-twitter_2.10" % "0.9.0-incubating"
Run Code Online (Sandbox Code Playgroud)

我只是在libraryDependencies网上这个,所以我不确定使用哪个版本等.

有人可以向我解释我应该如何解决这个.sbt文件.我花了几个小时试图搞清楚,但没有一个建议工作.我scala通过自制软件安装,我在版本上2.11.8

我的所有错误都是关于:

Modules were resolved with conflicting cross-version suffixes.
Run Code Online (Sandbox Code Playgroud)

scala sbt apache-spark spark-streaming

35
推荐指数
2
解决办法
3万
查看次数

Spark Streaming - 读写Kafka主题

我正在使用Spark Streaming处理两个Kafka队列之间的数据,但我似乎找不到从Spark写Kafka的好方法.我试过这个:

input.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach {
      case x: String => {
        val props = new HashMap[String, Object]()

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message = new ProducerRecord[String, String]("output", null, x)
        producer.send(message)
      }
    }
  )
)
Run Code Online (Sandbox Code Playgroud)

并且它按预期工作,但是为每个消息实例化一个新的KafkaProducer在真实环境中显然是不可行的,我正在尝试解决它.

我想为每个进程保留一个实例的引用,并在需要发送消息时访问它.如何从Spark Streaming写入Kafka?

scala apache-kafka apache-spark spark-streaming spark-streaming-kafka

33
推荐指数
5
解决办法
4万
查看次数

使用,提交和最大堆内存的差异

我正在监视OutOfMemoryException的spark执行器JVM.我使用Jconsole连接到执行程序JVM.以下是Jconsole的快照: 在此输入图像描述

在图像中使用的内存显示为3.8G,提交的内存为8.6G,最大内存也是8.6G.任何人都可以解释使用和提交的内存或任何解释它的链接之间的区别.

java jvm memory-management apache-spark spark-streaming

32
推荐指数
2
解决办法
2万
查看次数

如何知道在YARN客户端模式下使用spark-shell进行ClosedChannelExceptions的原因是什么?

我一直试图spark-shell在YARN client模式下运行,但是我遇到了很多ClosedChannelException错误.我正在使用spark 2.0.0 build for Hadoop 2.6.

以下是例外情况:

$ spark-2.0.0-bin-hadoop2.6/bin/spark-shell --master yarn --deploy-mode client
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/09/13 14:12:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/09/13 14:12:38 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
16/09/13 14:12:55 ERROR cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED!
16/09/13 14:12:55 ERROR …
Run Code Online (Sandbox Code Playgroud)

hadoop hadoop-yarn apache-spark spark-streaming

31
推荐指数
1
解决办法
2万
查看次数

Spark使用python:如何解析Stage x包含一个非常大的任务(xxx KB).建议的最大任务大小为100 KB

我刚刚创建了python列表range(1,100000).

使用SparkContext完成以下步骤:

a = sc.parallelize([i for i in range(1, 100000)])
b = sc.parallelize([i for i in range(1, 100000)])

c = a.zip(b)

>>> [(1, 1), (2, 2), -----]

sum  = sc.accumulator(0)

c.foreach(lambda (x, y): life.add((y-x)))
Run Code Online (Sandbox Code Playgroud)

其中发出如下警告:

ARN TaskSetManager:阶段3包含一个非常大的任务(4644 KB).建议的最大任务大小为100 KB.

如何解决此警告?有没有办法处理大小?而且,它会影响大数据的时间复杂度吗?

apache-spark spark-streaming

30
推荐指数
3
解决办法
3万
查看次数

如何更新火花流中的广播变量?

我相信,我有一个相对常见的火花流用例:

我有一个对象流,我想根据一些参考数据进行过滤

最初,我认为使用广播变量实现这是一件非常简单的事情:

public void startSparkEngine {
    Broadcast<ReferenceData> refdataBroadcast
      = sparkContext.broadcast(getRefData());

    final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
        final ReferenceData refData = refdataBroadcast.getValue();
        return obj.getField().equals(refData.getField());
    }

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
            // Final processing of filtered objects
        });
        return null;
    });
}
Run Code Online (Sandbox Code Playgroud)

但是,尽管很少,我的参考数据会定期更改

我的印象是我可以在驱动程序上修改和重新广播我的变量,它会传播给每个工作者,但是Broadcast对象不是也不Serializable需要final.

我有什么替代品?我能想到的三个解决方案是:

  1. 将参考数据查找移动到一个forEachPartitionforEachRdd左右,使其完全驻留在工作者上.但是,参考数据存在于REST API中,因此我还需要以某种方式存储计时器/计数器以停止对流中的每个元素访问远程数据库.

  2. 每次refdata更改时,使用新的广播变量重新启动Spark上下文.

  3. 将参考数据转换为RDD,然后join以我现在流式传输的方式将流转换为流Pair<MyObject, RefData>,尽管这会将参考数据与每个对象一起发送.

java scala broadcast apache-spark spark-streaming

29
推荐指数
4
解决办法
2万
查看次数

如何停止火花流媒体工作?

我有一个连续运行的Spark Streaming作业.我如何优雅地停止工作?我已经阅读了在作业监视中附加关闭钩子并将SIGTERM发送到作业的通常建议.

sys.ShutdownHookThread {
  logger.info("Gracefully stopping Application...")
  ssc.stop(stopSparkContext = true, stopGracefully = true)
  logger.info("Application stopped gracefully")
}
Run Code Online (Sandbox Code Playgroud)

它似乎工作,但看起来不是最简单的方法来阻止这项工作.我在这里错过了什么吗?

从代码的角度来看,它可能有意义,但您如何在群集环境中使用它?如果我们启动一个火花流工作(我们在集群中的所有节点上分配作业),我们将不得不跟踪作业的PID和运行它的节点.最后,当我们必须停止进程时,我们需要跟踪作业运行的节点以及该进程的PID.我只是希望流媒体作业有一种更简单的工作控制方式.

apache-spark spark-streaming

28
推荐指数
1
解决办法
1万
查看次数

将Spring与Spark结合使用

我正在开发一个Spark应用程序,我习惯将Spring作为依赖注入框架.现在我遇到了问题,处理部分使用了Spring的@Autowired功能,但它被Spark序列化和反序列化.

所以下面的代码让我陷入困境:

Processor processor = ...; // This is a Spring constructed object
                           // and makes all the trouble
JavaRDD<Txn> rdd = ...; // some data for Spark
rdd.foreachPartition(processor);
Run Code Online (Sandbox Code Playgroud)

处理器看起来像这样:

public class Processor implements VoidFunction<Iterator<Txn>>, Serializeable {
    private static final long serialVersionUID = 1L;

    @Autowired // This will not work if the object is deserialized
    private transient DatabaseConnection db;

    @Override
    public void call(Iterator<Txn> txns) {
        ... // do some fance stuff
        db.store(txns);
    }
}
Run Code Online (Sandbox Code Playgroud)

所以我的问题是:是否甚至可以将Spring与Spark结合使用?如果没有,那么做这样的事情最优雅的方式是什么?任何帮助表示赞赏!

java spring apache-spark spark-streaming

25
推荐指数
1
解决办法
3万
查看次数

Spark Streaming:为什么内部处理成本如此之高以处理几MB的用户状态?

根据我们的实验,我们发现当状态变为超过一百万个对象时,有状态的Spark Streaming内部处理成本会花费大量时间.因此延迟会受到影响,因为我们必须增加批处理间隔以避免不稳定的行为(处理时间>批处理间隔).

它与我们的应用程序的细节无关,因为它可以通过下面的代码重现.

什么是Spark内部处理/基础架构成本,它需要花费大量时间来处理用户状态?除了简单地增加批处理间隔之外,还有减少处理时间的选项吗?

我们计划广泛使用状态:在每个节点上至少100MB左右,以便将所有数据保存在内存中,并且每小时只丢弃一次.

增加批处理间隔会有所帮助,但我们希望保持批处理间隔最小.

原因可能不是国家占用的空间,而是大型对象图,因为当我们将列表更改为大型基元时,问题就消失了.

只是一个猜测:它可能与org.apache.spark.util.SizeEstimatorSpark内部使用有关,因为它会在不时进行分析时显示出来.

在此输入图像描述

以下是在现代iCore7上重现上图的简单演示:

  • 不到15 MB的州
  • 根本没有流输入
  • 最快(虚拟)'updateStateByKey'功能
  • 批间隔1秒
  • 检查点(必须由Spark提供)到本地磁盘
  • 在本地和YARN进行了测试

码:

package spark;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.SizeEstimator;
import scala.Tuple2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SlowSparkStreamingUpdateStateDemo {

    // Very simple state model
    static class State implements Serializable {
        final List<String> data;
        State(List<String> data) {
            this.data = data;
        }
    }

    public static void main(String[] args) {
        SparkConf conf = …
Run Code Online (Sandbox Code Playgroud)

java performance apache-spark spark-streaming

24
推荐指数
1
解决办法
879
查看次数