为了说清楚,我不是从数组/列表中寻找RDD
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7); // sample
JavaRDD<Integer> rdd = new JavaSparkContext().parallelize(list);
Run Code Online (Sandbox Code Playgroud)
如何在没有在内存中完全缓冲的情况下从java迭代器创建spark RDD ?
Iterator<Integer> iterator = Arrays.asList(1, 2, 3, 4).iterator(); //sample iterator for illustration
JavaRDD<Integer> rdd = new JavaSparkContext().what("?", iterator); //the Question
Run Code Online (Sandbox Code Playgroud)
附加问题:
是否要求源可重新读取(或能够多次读取)以提供RDD的弹性?换句话说,由于迭代器基本上只读一次,是否有可能从迭代器创建弹性分布式数据集(RDD)?
我正在开发一个Scala(2.11)/ Spark(1.6.1)流式项目,mapWithState()用于跟踪以前批次中看到的数据.
状态分布在多个节点上的20个分区中,使用StateSpec.function(trackStateFunc _).numPartitions(20).在这种状态下,我们只有几个键(~100)映射到Sets最多约160,000个条目,这些条目在整个应用程序中增长.整个状态最多3GB,可以由群集中的每个节点处理.在每个批次中,一些数据被添加到一个状态,但直到过程结束时才被删除,即约15分钟.
在遵循应用程序UI时,与其他批次相比,每10个批次的处理时间非常长.看图像:
黄色字段代表高处理时间.
更详细的作业视图显示,在这些批次中发生在某一点,恰好是"跳过"所有20个分区.或者这就是UI所说的.
我的理解skipped是每个状态分区是一个可能的任务,没有被执行,因为它不需要重新计算.但是,我不明白为什么skips每个工作的数量变化以及为什么最后的工作需要如此多的处理.无论状态大小如何,都会出现更高的处理时间,它只会影响持续时间.
这是mapWithState()功能中的错误还是这个预期的行为?底层数据结构是否需要某种重新洗牌,Set状态是否需要复制数据?或者它更可能是我的应用程序中的缺陷?
我Kafka 0.8.2用来从AdExchange接收数据然后我Spark Streaming 1.4.1用来存储数据MongoDB.
我的问题是当我重新启动我的Spark StreamingJob时,例如更新新版本,修复bug,添加新功能.它将继续阅读最新offset的kafka重启作业期间在当时那么我将数据丢失的AdX推卡夫卡.
我尝试类似的东西,auto.offset.reset -> smallest但它会从0 - >收到最后数据是巨大的,并在数据库中重复.
我也尝试设置具体的group.id和consumer.id以Spark却是相同的.
如何保存最新的offset消耗,火花zookeeper或kafka然后可以从读回最新的offset?
apache-kafka apache-spark spark-streaming kafka-consumer-api
我有一个火花流工作,每5秒从Kafka读取一次,对传入的数据进行一些转换,然后写入文件系统.
这实际上并不需要是一个流媒体工作,实际上,我只想每天运行一次以将消息排放到文件系统上.我不知道如何阻止这份工作.
如果我将超时传递给streamingContext.awaitTermination,它不会停止进程,它只会导致进程在流上进行迭代时产生错误(请参阅下面的错误)
完成我想要做的事情的最佳方法是什么
这适用于Python上的Spark 1.6
编辑:
感谢@marios解决方案是这样的:
ssc.start()
ssc.awaitTermination(10)
ssc.stop()
Run Code Online (Sandbox Code Playgroud)
在停止之前运行脚本十秒钟.
简化代码:
conf = SparkConf().setAppName("Vehicle Data Consolidator").set('spark.files.overwrite','true')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 5)
stream = KafkaUtils.createStream(
ssc,
kafkaParams["zookeeper.connect"],
"vehicle-data-importer",
topicPartitions,
kafkaParams)
stream.saveAsTextFiles('stream-output/kafka-vehicle-data')
ssc.start()
ssc.awaitTermination(10)
Run Code Online (Sandbox Code Playgroud)
错误:
16/01/29 15:05:44 INFO BlockManagerInfo: Added input-0-1454097944200 in memory on localhost:58960 (size: 3.0 MB, free: 48.1 MB)
16/01/29 15:05:44 WARN BlockManager: Block input-0-1454097944200 replicated to only 0 peer(s) instead of 1 peers
16/01/29 15:05:44 INFO BlockGenerator: Pushed block input-0-1454097944200
16/01/29 15:05:45 ERROR JobScheduler: Error …Run Code Online (Sandbox Code Playgroud) 我经常不断得到以下异常,我想知道为什么会发生这种情况?经过研究,我发现我能做到,.set("spark.submit.deployMode", "nio");但这也不起作用,我使用的是火花2.0.0
WARN TransportChannelHandler: Exception in connection from /172.31.3.245:46014
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:898)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
Run Code Online (Sandbox Code Playgroud) 是否可以限制Kafka消费者为Spark Streaming返回的批次大小?
我问,因为我得到的第一批有数亿条记录,处理和检查它们需要很长时间.
apache-kafka apache-spark spark-streaming kafka-consumer-api
当Parquet文件data在其date列上写入分区时,我们得到一个目录结构,如:
/data
_common_metadata
_metadata
_SUCCESS
/date=1
part-r-xxx.gzip
part-r-xxx.gzip
/date=2
part-r-xxx.gzip
part-r-xxx.gzip
Run Code Online (Sandbox Code Playgroud)
如果在date=2没有Parquet实用程序(通过shell或文件浏览器等)的情况下删除分区,那么当只有分区时,是否需要回滚任何元数据文件date=1?
或者可以随意删除分区并在以后重写它们(或不重写)?
在Spark Streaming中,可以(并且必须使用有状态操作)将StreamingContext检查点设置为(AND)的可靠数据存储(S3,HDFS,...):
DStream 血统如上所述这里,设置输出数据存储需要调用yourSparkStreamingCtx.checkpoint(datastoreURL)
另一方面,可以DataStream通过调用checkpoint(timeInterval)它们来为每个设置谱系检查点间隔.实际上,建议将谱系检查点间隔设置为DataStream滑动间隔的5到10倍:
dstream.checkpoint(checkpointInterval).通常,DStream的5-10个滑动间隔的检查点间隔是一个很好的设置.
我的问题是:
当流上下文设置为执行检查点并且没有ds.checkpoint(interval)被调用时,是否为所有数据流启用了谱系检查点,默认值checkpointInterval等于batchInterval?或者,相反,只有元数据检查点启用了什么?
我正在尝试使用createDirectStream方法打开Kafka(尝试版本0.11.0.2和1.0.1)流并获取此AbstractMethodError错误:
Exception in thread "main" java.lang.AbstractMethodError
at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at org.apache.spark.streaming.kafka010.KafkaUtils$.initializeLogIfNecessary(KafkaUtils.scala:39)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at org.apache.spark.streaming.kafka010.KafkaUtils$.log(KafkaUtils.scala:39)
at org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
at org.apache.spark.streaming.kafka010.KafkaUtils$.logWarning(KafkaUtils.scala:39)
at org.apache.spark.streaming.kafka010.KafkaUtils$.fixKafkaParams(KafkaUtils.scala:201)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:63)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)
Run Code Online (Sandbox Code Playgroud)
这就是我所说的:
val preferredHosts = LocationStrategies.PreferConsistent
val kafkaParams = Map(
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[IntegerDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest"
)
val aCreatedStream = createDirectStream[String, String](ssc, preferredHosts,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
Run Code Online (Sandbox Code Playgroud)
我有Kafka在9092运行,我能够创建生产者和消费者,并在他们之间传递消息,所以不知道为什么它不能使用Scala代码.任何想法都赞赏.
获得以下异常,当我尝试使用scalatest在SBT窗口上执行我的火花流代码的单元测试时.
sbt testOnly <<ClassName>>
*
*
*
*
*
*2018-06-18 02:39:00 ERROR执行程序:91 - 阶段3.0(TID 11)中的任务1.0中的异常java.lang.NoSuchMethodError:net.jpountz.lz4.LZ4BlockInputStream.(Ljava/io/InputStream; Z)V位于org.apache.spark.serializer.SerializerManager.wrapStream的org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122)org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163)at org.apache.spark.serializer.SerializerManager.wrapStream (SerializerManager.scala:124)在org.apache.spark.shuffle.BlockStoreShuffleReader $$ anonfun $ 2.适用(BlockStoreShuffleReader.scala:50)在org.apache.spark.shuffle.BlockStoreShuffleReader $$ anonfun $ 2.适用(BlockStoreShuffleReader.scala :50)在org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:417)在org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)在scala.collection.Iterator $$匿名在scala上的scala.collection.Iterator $$ anon $ 12.hasNext(Iterator.scala:441)$ 12.nextCur(Iterator.scala:435).collection.Iterator $$匿名$ 11.hasNext(Iterator.scala:409)在org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)在org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala: 37)在scala.collection.Iterator $$不久$ 11.hasNext(Iterator.scala:409)在org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIteratorForCodegenStage1.sort_addToSorter $(来源不明)在org.apache.spark .sql.catalyst.expressions.GeneratedClass $ GeneratedIteratorForCodegenStage1.processNext(来源不明)在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec $ $ anonfun 10 $ $$匿名$ 1.hasNext(WholeStageCodegenExec.scala:614):在org.apache.spark.sql.execution.streaming在org.apache.spark.sql.execution.GroupedIterator $.适用(29 GroupedIterator.scala) .FlatMapGroupsWithStateExec $ StateStoreUpdater.updateStateForKeysWithData(FlatMapGroupsWithStateExec.scala:176)**
尝试了几件事来排除net.jpountz.lz4 jar(来自其他帖子的建议)但输出中的同样错误.
目前使用spark 2.3,scalatest 3.0.5,Scala 2.11版本.我在升级到spark 2.3和scalatest 3.0.5后才看到这个问题
有什么建议 ?
apache-spark ×10
spark-streaming ×10
apache-kafka ×4
scala ×3
parquet ×1
pyspark ×1
python ×1
sbt ×1
scalatest ×1