标签: spark-streaming

Spark Streaming:无法计算拆分,未找到块

我试图使用Spark Streaming与Kafka(版本1.1.0)但由于此错误,Spark作业不断崩溃:

14/11/21 12:39:23 ERROR TaskSetManager: Task 3967.0:0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming

13
推荐指数
1
解决办法
5951
查看次数

spark ssc.textFileStream不从目录中流式传输任何文件

我正在尝试使用eclipse(使用maven conf)和2个worker来执行下面的代码,每个都有2个核心或者也尝试过spark-submit.

public class StreamingWorkCount implements Serializable {

    public static void main(String[] args) {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
        JavaStreamingContext jssc = new JavaStreamingContext(
                "spark://192.168.1.19:7077", "JavaWordCount",
                new Duration(1000));
        JavaDStream<String> trainingData = jssc.textFileStream(
                "/home/bdi-user/kaushal-drive/spark/data/training").cache();
        trainingData.foreach(new Function<JavaRDD<String>, Void>() {

            public Void call(JavaRDD<String> rdd) throws Exception {
                List<String> output = rdd.collect();
                System.out.println("Sentences Collected from files " + output);
                return null;
            }
        });

        trainingData.print();
        jssc.start();
        jssc.awaitTermination();
    }
}
Run Code Online (Sandbox Code Playgroud)

并记录该代码

15/01/22 21:57:13 INFO FileInputDStream: New files at time 1421944033000 ms:

15/01/22 21:57:13 INFO JobScheduler: Added jobs for time 1421944033000 …
Run Code Online (Sandbox Code Playgroud)

filesystems data-stream apache-spark spark-streaming

13
推荐指数
2
解决办法
1万
查看次数

Apache Spark Streaming,如何处理下游依赖性故障

我试图了解如何使Spark Streaming应用程序更容错(特别是在尝试写入下游依赖项时),并且我不知道在尝试将结果写入外部源时处理失败的最佳方法是什么,像Cassandra,DynamoDB等.

例如,我有一个Spark Streaming作业从Stream(Kafka,Flume等等)中提取数据......我还没有最终确定使用哪种技术,将类似项目聚合在一起,然后将结果写入外部存储.(即Cassandra,DynamoDB或任何正在接收我的DStream计算结果的东西).

我试图弄清楚如何处理外部依赖关系无法写入的情况.也许集群发生故障,可能存在权限问题等,但我的工作无法将结果写入外部依赖项.有没有办法暂停Spark Streaming以便接收器不会继续批量处理数据?我应该只是睡觉当前批次并让接收器继续存储批次吗?如果问题是暂时的(几秒钟),继续批处理可能是可以接受的,但如果依赖性下降几分钟或1小时以上会发生什么?

我有一个想法是有一个监视进程,在后台监视依赖项的健康状况,如果它发现它"不健康",它将停止工作.然后,当所有依赖项都运行正常时,我可以重新启动作业并处理未写入外部源的所有数据.

我的另一个想法是以某种方式在DStream forEachRdd方法中发出信号,表示存在问题.我可以在DStream中抛出某种异常,它会向驱动程序发出它应该停止的信号吗?

如果有人有任何关于如何处理外部容错的经验,或者可以指向我的好文章/视频,那将是很好的.

谢谢

apache-spark spark-streaming

13
推荐指数
1
解决办法
872
查看次数

Parquet元数据文件是否需要回滚?

当Parquet文件data在其date列上写入分区时,我们得到一个目录结构,如:

/data
    _common_metadata
    _metadata
    _SUCCESS
    /date=1
        part-r-xxx.gzip
        part-r-xxx.gzip
    /date=2
        part-r-xxx.gzip
        part-r-xxx.gzip
Run Code Online (Sandbox Code Playgroud)

如果在date=2没有Parquet实用程序(通过shell或文件浏览器等)的情况下删除分区,那么当只有分区时,是否需要回滚任何元数据文件date=1

或者可以随意删除分区并在以后重写它们(或不重写)?

apache-spark parquet spark-streaming

13
推荐指数
1
解决办法
439
查看次数

针对DStream的Spark流检查点

在Spark Streaming中,可以(并且必须使用有状态操作)将StreamingContext检查点设置为(AND)的可靠数据存储(S3,HDFS,...):

  • 元数据
  • DStream 血统

如上所述这里,设置输出数据存储需要调用yourSparkStreamingCtx.checkpoint(datastoreURL)

另一方面,可以DataStream通过调用checkpoint(timeInterval)它们来为每个设置谱系检查点间隔.实际上,建议将谱系检查点间隔设置为DataStream滑动间隔的5到10倍:

dstream.checkpoint(checkpointInterval).通常,DStream的5-10个滑动间隔的检查点间隔是一个很好的设置.

我的问题是:

当流上下文设置为执行检查点并且没有ds.checkpoint(interval)被调用时,是否为所有数据流启用了谱系检查点,默认值checkpointInterval等于batchInterval?或者,相反,只有元数据检查点启用了什么?

apache-spark spark-streaming checkpointing

13
推荐指数
1
解决办法
8221
查看次数

使用模式将带有Spark的AVRO消息转换为DataFrame

有没有使用模式转换方式从消息?用户记录的模式文件:

{
  "fields": [
    { "name": "firstName", "type": "string" },
    { "name": "lastName", "type": "string" }
  ],
  "name": "user",
  "type": "record"
}
Run Code Online (Sandbox Code Playgroud)

来自SqlNetworkWordCount示例Kafka,Spark和Avro的代码片段- 第3部分,生成和使用Avro消息来读取消息.

object Injection {
  val parser = new Schema.Parser()
  val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
  val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}

...

messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  import sqlContext.implicits._

  val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

  df.show() …
Run Code Online (Sandbox Code Playgroud)

scala avro apache-kafka apache-spark spark-streaming

13
推荐指数
1
解决办法
1万
查看次数

AbstractMethodError创建Kafka流

我正在尝试使用createDirectStream方法打开Kafka(尝试版本0.11.0.2和1.0.1)流并获取此AbstractMethodError错误:

Exception in thread "main" java.lang.AbstractMethodError
    at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.initializeLogIfNecessary(KafkaUtils.scala:39)
    at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.log(KafkaUtils.scala:39)
    at org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.logWarning(KafkaUtils.scala:39)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.fixKafkaParams(KafkaUtils.scala:201)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:63)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)
Run Code Online (Sandbox Code Playgroud)

这就是我所说的:

val preferredHosts = LocationStrategies.PreferConsistent
    val kafkaParams = Map(
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[IntegerDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest"
    )

    val aCreatedStream = createDirectStream[String, String](ssc, preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
Run Code Online (Sandbox Code Playgroud)

我有Kafka在9092运行,我能够创建生产者和消费者,并在他们之间传递消息,所以不知道为什么它不能使用Scala代码.任何想法都赞赏.

scala apache-kafka apache-spark spark-streaming

13
推荐指数
2
解决办法
7800
查看次数

Spark Streaming + Kafka:SparkException:无法找到Set的前导偏移

我正在尝试设置Spark Streaming以从Kafka队列获取消息.我收到以下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o30.createDirectStream.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([test-topic,0])
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
Run Code Online (Sandbox Code Playgroud)

这是我正在执行的代码(pyspark):

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test-topic"], {"metadata.broker.list": "host.domain:9092"})

ssc.start()
ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

有几个类似的帖子有相同的错误.在所有情况下,原因都是空的kafka主题.我的"测试主题"中有消息.我可以把它们拿出去

kafka-console-consumer --zookeeper host.domain:2181 --topic test-topic --from-beginning --max-messages 100
Run Code Online (Sandbox Code Playgroud)

有谁知道可能是什么问题?

我正在使用:

  • Spark 1.5.2(apache)
  • 卡夫卡0.8.2.0 + kafka1.3.0(CDH 5.4.7)

apache-kafka apache-spark spark-streaming

12
推荐指数
2
解决办法
2万
查看次数

Kafka主题分区为Spark流媒体

我有一些用例,我想更清楚一点,关于Kafka主题分区 - >火花流资源利用率.

我使用spark独立模式,所以我只有"执行器总数"和"执行器内存".据我所知并根据文档,将并行性引入Spark流的方法是使用分区的Kafka主题 - >当我使用spark-kafka直接流集成时,RDD将具有与kafka相同数量的分区.

因此,如果我在主题中有1个分区,并且有1个执行程序核心,那么该核心将从Kafka顺序读取.

如果我有:

  • 主题中有2个分区,只有1个执行器核心?该核心将首先从一个分区读取,然后从第二个分区读取,因此分区主题没有任何好处吗?

  • 主题中有2个分区和2个核心?然后1个执行器核心从1个分区读取,第二个核心从第二个分区读取吗?

  • 1个kafka分区和2个执行器核心?

谢谢.

apache-kafka apache-spark spark-streaming

12
推荐指数
1
解决办法
7412
查看次数

由于内存泄漏导致Spark执行器崩溃

当运行使用来自kafka主题100个分区的数据的spark流媒体应用程序,并且每个执行程序运行10个执行程序,5个核心和20GB RAM时,执行程序将崩溃并显示以下日志:

ERROR ResourceLeakDetector:泄漏:ByteBuf.release()是垃圾收集之前,不叫.启用高级泄漏报告以找出泄漏发生的位置.

ERROR YarnClusterScheduler:在worker23.oct.com上丢失执行者18:奴隶丢失了

ERROR ApplicationMaster:收到的信号期限

此异常出现在spark JIRA中:

https://issues.apache.org/jira/browse/SPARK-17380

有人在升级到spark 2.0.2后写道,问题解决了.但是我们使用spark 2.1作为HDP 2.6的一部分.所以我猜这个bug在火花2.1中没有解决.

还有人遇到过这个bug,并在spark用户列表中写过但没有得到答案:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Receiver-Resource-Leak-td27857.html

顺便说一句 - 流媒体应用程序没有调用cache()persist(),因此不涉及任何缓存.

有没有人遇到过崩溃的流媒体应用?

out-of-memory netty spark-streaming apache-spark-2.0

12
推荐指数
0
解决办法
731
查看次数