小编Rah*_*kla的帖子

Spark 1.6.0在群集模式下抛出异构播放异常在本地模式下工作正常

我在集群模式和本地模式下尝试简单的字数计算示例它在本地模式下工作正常但在集群模式下抛出类强制转换异常这里是代码片段...

package com.example

import com.typesafe.config.ConfigFactory
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Rahul Shukla on 20/2/16.
  */
object SampleFile extends App {
  val config = ConfigFactory.load()
  val conf = new SparkConf()
    .setAppName("WordCount")
    //.setMaster("spark://ULTP:7077")
    .setMaster("local")
    .setSparkHome(config.getString("example.spark.home"))
    .set("spark.cleaner.ttl", "30s")
    .set("spark.app.id", "KnowledgeBase")
    .set("spark.driver.allowMultipleContexts", "true")
  val sc = new SparkContext(conf)
  sc.textFile("build.sbt").flatMap(row => row.split(" ")).map((_, 1)).reduceByKey(_ + _).foreach(println)
}
Run Code Online (Sandbox Code Playgroud)

环境Spark 1.6构建对scala 2.11.7

例外:

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006) …
Run Code Online (Sandbox Code Playgroud)

classcastexception apache-spark

12
推荐指数
1
解决办法
3167
查看次数

Spark:2.0.2 java.util.ConcurrentModificationException:KafkaConsumer对多线程访问不安全

我对kafka 0.10.1.0和spark 2.0.2的误差低于误差

private val spark = SparkSession.builder()
.master("local[*]")
.appName(job.name)
.config("spark.cassandra.connection.host","localhost"))
.config("spark.cassandra.connection.port","9042")
.config("spark.streaming.receiver.maxRate", 10000)
.config("spark.streaming.kafka.maxRatePerPartition", 10000)
.config("spark.streaming.kafka.consumer.cache.maxCapacity", 1)
.config("spark.streaming.kafka.consumer.cache.initialCapacity", 1)
.getOrCreate()

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> config.getString("kafka.hosts"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> job.name,
"auto.offset.reset" -> config.getString("kafka.offset"),
"enable.auto.commit" -> (false: java.lang.Boolean)
)`
Run Code Online (Sandbox Code Playgroud)

例外

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1557)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1177)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark spark-streaming

5
推荐指数
1
解决办法
1394
查看次数

二叉树奇数节点总和与偶数节点总和的差异

如何编写函数来返回奇数高度节点的值之和与偶数高度节点的值之和的差。考虑到二叉树的根节点高度为 1

输入:

                                      1
                              2                3
                          4        5       6        7
                      8     9  10    11  12  13   14  15
Run Code Online (Sandbox Code Playgroud)

输出:-74 解释:

[ (1 + 4 + 5 + 6 + 7 ) - (2 + 3 + 8 + 9 + 10 + 11 + 12 + 13 + 14 + 15) = -74 ]
Run Code Online (Sandbox Code Playgroud)

代码:

public static int diff(Node n) {
    if (n == null)
        return 0;
    return Sum(n) - Sum(n.left) - Sum(n.right);

}
public static int Sum(Node root) { …
Run Code Online (Sandbox Code Playgroud)

java binary-tree

2
推荐指数
1
解决办法
7721
查看次数

没有ClassTag可用(Scala)

在Scala中我发生了一些奇怪的事情.我正在尝试使用第三方库

org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream

并获得No ClassTag异常.我模拟下面的场景,因为人们可以将Util视为第三方库.为什么会这样?

object Util {
    def fun1[K: ClassTag, M: ClassTag, KD: ClassTag, MD: ClassTag]: Unit = {
        println("In function version 2")
    }
}

class ClassTagIssue[K, M, KD, MD] {
    def build: Unit = {
        Util.fun1[K, M, KD, MD]
    }
}

object ClassTagIssue {
    def main(args: Array[String]) {
        new ClassTagIssue[String, String, String, String]().build
    }
}
Run Code Online (Sandbox Code Playgroud)

我正在尝试运行此代码并获得以下异常

Error:(23, 14) No ClassTag available for K
    Util.fun1[K, M, KD, MD]
             ^
Error:(23, 14) not enough arguments for method fun1: (implicit evidence$1: scala.reflect.ClassTag[K], …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-spark spark-streaming

1
推荐指数
1
解决办法
1157
查看次数

Kafka 流状态目录 io 错误

流运行一定时间后给出以下错误?我找不到谁负责创建 .sst 文件?

环境:

卡夫卡版本 0.10.0-cp1

Scala 2.11.8

    org.apache.kafka.streams.errors.ProcessorStateException: Error while executing flush from store agg
        at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:424)
        at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:414)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:165)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:330)
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:247)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.rocksdb.RocksDBException: IO error: /tmp/kafka-streams/pos/0_0/rocksdb/agg/000008.sst: No such file or directory
        at org.rocksdb.RocksDB.flush(Native Method)
        at org.rocksdb.RocksDB.flush(RocksDB.java:1329)
        at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:422)
        ... 9 more
[2016-06-24 11:13:54,910] ERROR Failed to commit StreamTask #0_0 in thread [StreamThread-1]:  (org.apache.kafka.streams.processor.internals.StreamThread:452)
org.apache.kafka.streams.errors.ProcessorStateException: Error while batch writing to store agg
        at org.apache.kafka.streams.state.internals.RocksDBStore.putAllInternal(RocksDBStore.java:324)
        at …
Run Code Online (Sandbox Code Playgroud)

apache-kafka rocksdb apache-kafka-streams

1
推荐指数
1
解决办法
2764
查看次数