Apache Spark:执行程序之间的网络错误

mjj*_*son 22 scala apache-spark

我在Scala 2.11.2上运行Apache Spark 1.3.1,当在具有足够大数据的HPC集群上运行时,我收到很多错误,比如帖子底部的错误(每秒重复多次,直到工作随着时间的推移被杀死).根据错误,执行程序尝试从其他节点获取随机数据,但无法执行此操作.

同样的程序可以使用(a)较少量的数据,或者(b)在仅本地模式下执行,因此它与通过网络发送的数据有关(并且不会被非常小的触发)数据量).

在发生这种情况时执行的代码如下:

val partitioned_data = data  // data was read as sc.textFile(inputFile)
  .zipWithIndex.map(x => (x._2, x._1))
  .partitionBy(partitioner)  // A custom partitioner
  .map(_._2)

// Force previous lazy operations to be evaluated. Presumably adds some
// overhead, but hopefully the minimum possible...
// Suggested on Spark user list: http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-RDD-computation-with-something-else-than-count-td707.html
sc.runJob(partitioned_data, (iter: Iterator[_]) => {})
Run Code Online (Sandbox Code Playgroud)

这是一个错误的指示,还是有什么我做错了?

这是一个执行程序的stderr日志的小片段(完整日志在这里):

15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593000, chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/15/shuffle_0_1_0.data, offset=26501223, length=6227612}} to /10.0.0.5:41160; closing connection
java.io.IOException: Resource temporarily unavailable
    at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
    at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415)
    at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516)
    at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96)
    at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89)
    at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237)
    at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233)
    at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:707)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:315)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:676)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1059)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669)
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741)
    at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895)
    at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240)
    at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147)
    at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at java.lang.Thread.run(Thread.java:619)
15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593000, chunkIndex=1}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/27/shuffle_0_5_0.data, offset=3792987, length=2862285}} to /10.0.0.5:41160; closing connection
java.nio.channels.ClosedChannelException
15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593002, chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/15/shuffle_0_1_0.data, offset=0, length=10993212}} to /10.0.0.6:42426; closing connection
java.io.IOException: Resource temporarily unavailable
    at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
    at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415)
    at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516)
    at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96)
    at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89)
    at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237)
    at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233)
    at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:707)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:315)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:676)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1059)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669)
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741)
    at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895)
    at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240)
    at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147)
    at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at java.lang.Thread.run(Thread.java:619)
15/04/21 14:59:28 WARN TransportChannelHandler: Exception in connection from node5.someuniversity.edu/10.0.0.5:60089
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcher.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:233)
    at sun.nio.ch.IOUtil.read(IOUtil.java:206)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236)
    at io.netty.buffer.PooledHeapByteBuf.setBytes(PooledHeapByteBuf.java:234)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at java.lang.Thread.run(Thread.java:619)
15/04/21 14:59:28 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from node5.someuniversity.edu/10.0.0.5:60089 is closed
15/04/21 14:59:28 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2 outstanding blocks after 5000 ms
Run Code Online (Sandbox Code Playgroud)

mjj*_*son 18

这似乎是与Spark 1.2中Netty添加的网络系统(块传输服务)相关的错误.添加到我的SparkConf修复了错误,所以现在一切都很完美..set("spark.shuffle.blockTransferService", "nio")

在spark用户邮件列表上找到了一个发现类似错误的人的帖子,他们建议尝试nio而不是Netty.

SPARK-5085类似,从改变Nettynio固定他们的问题; 但是,他们也可以通过更改一些网络设置来解决问题.(我自己也没有尝试过,因为我不确定我是否拥有在群集上执行此操作的权限.)

  • 切换到nio没有解决spark 1.5.1中的问题,有什么想法吗? (8认同)
  • 我使用spark 2.0.0并将其切换到NIO仍然没有解决问题.. (3认同)
  • 我正在使用spark 1.6.0并且切换到NIO仍然没有解决问题.我认为这个问题值得一试,因为我在日志中看到了大量的这些错误. (3认同)
  • 我使用spark 2.0.0并将其切换到NIO仍然没有解决问题. (2认同)
  • 在 spark 2.2.0 中仍然存在这个问题 (2认同)
  • 根据有关设置spark.shuffle.blockTransferService的文档:*用于在执行程序之间传输shuffle和缓存的块的实现。有两种实现方式:netty和nio。基于Netty的块传输旨在更简单但同样有效,并且是从1.2开始的默认选项,而nio块传输在Spark 1.5.0中已弃用,在Spark 1.6.0中将被删除。*因此,我不认为改用nio可以解决问题 (2认同)