标签: spark-streaming

如何从迭代器创建Spark RDD?

为了说清楚,我不是从数组/列表中寻找RDD

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7); // sample
JavaRDD<Integer> rdd = new JavaSparkContext().parallelize(list);
Run Code Online (Sandbox Code Playgroud)

如何在没有在内存中完全缓冲的情况下从java迭代器创建spark RDD ?

Iterator<Integer> iterator = Arrays.asList(1, 2, 3, 4).iterator(); //sample iterator for illustration
JavaRDD<Integer> rdd = new JavaSparkContext().what("?", iterator); //the Question
Run Code Online (Sandbox Code Playgroud)

附加问题:

是否要求源可重新读取(或能够多次读取)以提供RDD的弹性?换句话说,由于迭代器基本上只读一次,是否有可能从迭代器创建弹性分布式数据集(RDD)?

apache-spark spark-streaming

15
推荐指数
1
解决办法
7167
查看次数

Spark Streaming mapWithState似乎定期重建完整状态

我正在开发一个Scala(2.11)/ Spark(1.6.1)流式项目,mapWithState()用于跟踪以前批次中看到的数据.

状态分布在多个节点上的20个分区中,使用StateSpec.function(trackStateFunc _).numPartitions(20).在这种状态下,我们只有几个键(~100)映射到Sets最多约160,000个条目,这些条目在整个应用程序中增长.整个状态最多3GB,可以由群集中的每个节点处理.在每个批次中,一些数据被添加到一个状态,但直到过程结束时才被删除,即约15分钟.

在遵循应用程序UI时,与其他批次相比,每10个批次的处理时间非常长.看图像:

尖峰显示更高的处理时间.

黄色字段代表高处理时间.

在此输入图像描述

更详细的作业视图显示,在这些批次中发生在某一点,恰好是"跳过"所有20个分区.或者这就是UI所说的.

在此输入图像描述

我的理解skipped是每个状态分区是一个可能的任务,没有被执行,因为它不需要重新计算.但是,我不明白为什么skips每个工作的数量变化以及为什么最后的工作需要如此多的处理.无论状态大小如何,都会出现更高的处理时间,它只会影响持续时间.

这是mapWithState()功能中的错误还是这个预期的行为?底层数据结构是否需要某种重新洗牌,Set状态是否需要复制数据?或者它更可能是我的应用程序中的缺陷?

scala apache-spark spark-streaming

15
推荐指数
1
解决办法
3140
查看次数

如何保存Spark消耗给ZK或Kafka的最新偏移量,并在重启后可以回读

Kafka 0.8.2用来从AdExchange接收数据然后我Spark Streaming 1.4.1用来存储数据MongoDB.

我的问题是当我重新启动我的Spark StreamingJob时,例如更新新版本,修复bug,添加新功能.它将继续阅读最新offsetkafka重启作业期间在当时那么我将数据丢失的AdX推卡夫卡.

我尝试类似的东西,auto.offset.reset -> smallest但它会从0 - >收到最后数据是巨大的,并在数据库中重复.

我也尝试设置具体的group.idconsumer.idSpark却是相同的.

如何保存最新的offset消耗,火花zookeeperkafka然后可以从读回最新的offset

apache-kafka apache-spark spark-streaming kafka-consumer-api

14
推荐指数
1
解决办法
1万
查看次数

如何在数据源用完时停止火花流

我有一个火花流工作,每5秒从Kafka读取一次,对传入的数据进行一些转换,然后写入文件系统.

这实际上并不需要是一个流媒体工作,实际上,我只想每天运行一次以将消息排放到文件系统上.我不知道如何阻止这份工作.

如果我将超时传递给streamingContext.awaitTermination,它不会停止进程,它只会导致进程在流上进行迭代时产生错误(请参阅下面的错误)

完成我想要做的事情的最佳方法是什么

这适用于Python上的Spark 1.6

编辑:

感谢@marios解决方案是这样的:

ssc.start()
ssc.awaitTermination(10)
ssc.stop()
Run Code Online (Sandbox Code Playgroud)

在停止之前运行脚本十秒钟.

简化代码:

conf = SparkConf().setAppName("Vehicle Data Consolidator").set('spark.files.overwrite','true')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 5)
stream = KafkaUtils.createStream(
    ssc,
    kafkaParams["zookeeper.connect"],
    "vehicle-data-importer",
    topicPartitions,
    kafkaParams)

stream.saveAsTextFiles('stream-output/kafka-vehicle-data')

ssc.start()
ssc.awaitTermination(10)
Run Code Online (Sandbox Code Playgroud)

错误:

16/01/29 15:05:44 INFO BlockManagerInfo: Added input-0-1454097944200 in memory on localhost:58960 (size: 3.0 MB, free: 48.1 MB)
16/01/29 15:05:44 WARN BlockManager: Block input-0-1454097944200 replicated to only 0 peer(s) instead of 1 peers
16/01/29 15:05:44 INFO BlockGenerator: Pushed block input-0-1454097944200
16/01/29 15:05:45 ERROR JobScheduler: Error …
Run Code Online (Sandbox Code Playgroud)

python apache-kafka apache-spark spark-streaming pyspark

14
推荐指数
1
解决办法
7894
查看次数

如何修复来自apache-spark的对等消息的连接重置?

我经常不断得到以下异常,我想知道为什么会发生这种情况?经过研究,我发现我能做到,.set("spark.submit.deployMode", "nio");但这也不起作用,我使用的是火花2.0.0

WARN TransportChannelHandler: Exception in connection from /172.31.3.245:46014
    java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:898)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming

14
推荐指数
1
解决办法
1万
查看次数

使用Spark Streaming时限制Kafka批量大小

是否可以限制Kafka消费者为Spark Streaming返回的批次大小?

我问,因为我得到的第一批有数亿条记录,处理和检查它们需要很长时间.

apache-kafka apache-spark spark-streaming kafka-consumer-api

14
推荐指数
2
解决办法
1万
查看次数

Parquet元数据文件是否需要回滚?

当Parquet文件data在其date列上写入分区时,我们得到一个目录结构,如:

/data
    _common_metadata
    _metadata
    _SUCCESS
    /date=1
        part-r-xxx.gzip
        part-r-xxx.gzip
    /date=2
        part-r-xxx.gzip
        part-r-xxx.gzip
Run Code Online (Sandbox Code Playgroud)

如果在date=2没有Parquet实用程序(通过shell或文件浏览器等)的情况下删除分区,那么当只有分区时,是否需要回滚任何元数据文件date=1

或者可以随意删除分区并在以后重写它们(或不重写)?

apache-spark parquet spark-streaming

13
推荐指数
1
解决办法
439
查看次数

针对DStream的Spark流检查点

在Spark Streaming中,可以(并且必须使用有状态操作)将StreamingContext检查点设置为(AND)的可靠数据存储(S3,HDFS,...):

  • 元数据
  • DStream 血统

如上所述这里,设置输出数据存储需要调用yourSparkStreamingCtx.checkpoint(datastoreURL)

另一方面,可以DataStream通过调用checkpoint(timeInterval)它们来为每个设置谱系检查点间隔.实际上,建议将谱系检查点间隔设置为DataStream滑动间隔的5到10倍:

dstream.checkpoint(checkpointInterval).通常,DStream的5-10个滑动间隔的检查点间隔是一个很好的设置.

我的问题是:

当流上下文设置为执行检查点并且没有ds.checkpoint(interval)被调用时,是否为所有数据流启用了谱系检查点,默认值checkpointInterval等于batchInterval?或者,相反,只有元数据检查点启用了什么?

apache-spark spark-streaming checkpointing

13
推荐指数
1
解决办法
8221
查看次数

AbstractMethodError创建Kafka流

我正在尝试使用createDirectStream方法打开Kafka(尝试版本0.11.0.2和1.0.1)流并获取此AbstractMethodError错误:

Exception in thread "main" java.lang.AbstractMethodError
    at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.initializeLogIfNecessary(KafkaUtils.scala:39)
    at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.log(KafkaUtils.scala:39)
    at org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.logWarning(KafkaUtils.scala:39)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.fixKafkaParams(KafkaUtils.scala:201)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:63)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)
Run Code Online (Sandbox Code Playgroud)

这就是我所说的:

val preferredHosts = LocationStrategies.PreferConsistent
    val kafkaParams = Map(
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[IntegerDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest"
    )

    val aCreatedStream = createDirectStream[String, String](ssc, preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
Run Code Online (Sandbox Code Playgroud)

我有Kafka在9092运行,我能够创建生产者和消费者,并在他们之间传递消息,所以不知道为什么它不能使用Scala代码.任何想法都赞赏.

scala apache-kafka apache-spark spark-streaming

13
推荐指数
2
解决办法
7800
查看次数

SBT测试错误:java.lang.NoSuchMethodError:net.jpountz.lz4.LZ4BlockInputStream

获得以下异常,当我尝试使用scalatest在SBT窗口上执行我的火花流代码的单元测试时.

sbt testOnly <<ClassName>>

*
*
*
*
*
*

2018-06-18 02:39:00 ERROR执行程序:91 - 阶段3.0(TID 11)中的任务1.0中的异常java.lang.NoSuchMethodError:net.jpountz.lz4.LZ4BlockInputStream.(Ljava/io/InputStream; Z)V位于org.apache.spark.serializer.SerializerManager.wrapStream的org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122)org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163)at org.apache.spark.serializer.SerializerManager.wrapStream (SerializerManager.scala:124)在org.apache.spark.shuffle.BlockStoreShuffleReader $$ anonfun $ 2.适用(BlockStoreShuffleReader.scala:50)在org.apache.spark.shuffle.BlockStoreShuffleReader $$ anonfun $ 2.适用(BlockStoreShuffleReader.scala :50)在org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:417)在org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)在scala.collection.Iterator $$匿名在scala上的scala.collection.Iterator $$ anon $ 12.hasNext(Iterator.scala:441)$ 12.nextCur(Iterator.scala:435).collection.Iterator $$匿名$ 11.hasNext(Iterator.scala:409)在org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)在org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala: 37)在scala.collection.Iterator $$不久$ 11.hasNext(Iterator.scala:409)在org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIteratorForCodegenStage1.sort_addToSorter $(来源不明)在org.apache.spark .sql.catalyst.expressions.GeneratedClass $ GeneratedIteratorForCodegenStage1.processNext(来源不明)在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec $ $ anonfun 10 $ $$匿名$ 1.hasNext(WholeStageCodegenExec.scala:614):在org.apache.spark.sql.execution.streaming在org.apache.spark.sql.execution.GroupedIterator $.适用(29 GroupedIterator.scala) .FlatMapGroupsWithStateExec $ StateStoreUpdater.updateStateForKeysWithData(FlatMapGroupsWithStateExec.scala:176)**

尝试了几件事来排除net.jpountz.lz4 jar(来自其他帖子的建议)但输出中的同样错误.

目前使用spark 2.3,scalatest 3.0.5,Scala 2.11版本.我在升级到spark 2.3和scalatest 3.0.5后才看到这个问题

有什么建议 ?

scala sbt scalatest apache-spark spark-streaming

13
推荐指数
1
解决办法
6106
查看次数