我怎么解决这个问题.我收到以下错误:
java.nio.channels.ClosedChannelException
这是编码:
public void run() {
try {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(512);
int i1 = socketChannel.read(buffer);
if (buffer.limit() == 0 || i1 == -1) {
Socket s = null;
try {
s = socketChannel.socket();
s.close();
key.cancel();
} catch (IOException ie) {
if (UnitDataServer.isLog) {
log.error("Error closing socket " + s + ": " + ie);
}
}
} else {
buffer.flip();
if (UnitDataServer.isLog) {
log.info(" Recvd Message from Unit : " + buffer.array());
} …Run Code Online (Sandbox Code Playgroud) Spark 1.6.2(apache)Kafka 2.1.1(CDH 5.7.2)
val conf = new SparkConf().setAppName("Test").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(15))
val kafkaDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
Run Code Online (Sandbox Code Playgroud)
我正在尝试使用Spark Streaming来消息来自Kafka的消息.但是当程序运行一段时间后,我得到以下信息,然后总是输出INFO SimpleConsumer:由于套接字错误重新连接:java.nio.channels.ClosedChannelException:
16/10/08 18:28:00 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException
16/10/08 18:28:00 INFO JobScheduler: Added jobs for time 1475922480000 ms
16/10/08 18:28:15 INFO JobScheduler: Added jobs for time 1475922495000 ms
16/10/08 18:28:30 INFO JobScheduler: Added jobs for time 1475922510000 ms
16/10/08 18:28:45 INFO JobScheduler: Added jobs for time …Run Code Online (Sandbox Code Playgroud)