我在集群模式和本地模式下尝试简单的字数计算示例它在本地模式下工作正常但在集群模式下抛出类强制转换异常这里是代码片段...
package com.example
import com.typesafe.config.ConfigFactory
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by Rahul Shukla on 20/2/16.
*/
object SampleFile extends App {
val config = ConfigFactory.load()
val conf = new SparkConf()
.setAppName("WordCount")
//.setMaster("spark://ULTP:7077")
.setMaster("local")
.setSparkHome(config.getString("example.spark.home"))
.set("spark.cleaner.ttl", "30s")
.set("spark.app.id", "KnowledgeBase")
.set("spark.driver.allowMultipleContexts", "true")
val sc = new SparkContext(conf)
sc.textFile("build.sbt").flatMap(row => row.split(" ")).map((_, 1)).reduceByKey(_ + _).foreach(println)
}
Run Code Online (Sandbox Code Playgroud)
环境Spark 1.6构建对scala 2.11.7
例外:
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006) …Run Code Online (Sandbox Code Playgroud) 我对kafka 0.10.1.0和spark 2.0.2的误差低于误差
private val spark = SparkSession.builder()
.master("local[*]")
.appName(job.name)
.config("spark.cassandra.connection.host","localhost"))
.config("spark.cassandra.connection.port","9042")
.config("spark.streaming.receiver.maxRate", 10000)
.config("spark.streaming.kafka.maxRatePerPartition", 10000)
.config("spark.streaming.kafka.consumer.cache.maxCapacity", 1)
.config("spark.streaming.kafka.consumer.cache.initialCapacity", 1)
.getOrCreate()
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> config.getString("kafka.hosts"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> job.name,
"auto.offset.reset" -> config.getString("kafka.offset"),
"enable.auto.commit" -> (false: java.lang.Boolean)
)`
Run Code Online (Sandbox Code Playgroud)
例外
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1557)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1177)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at …Run Code Online (Sandbox Code Playgroud) 如何编写函数来返回奇数高度节点的值之和与偶数高度节点的值之和的差。考虑到二叉树的根节点高度为 1
输入:
1
2 3
4 5 6 7
8 9 10 11 12 13 14 15
Run Code Online (Sandbox Code Playgroud)
输出:-74 解释:
[ (1 + 4 + 5 + 6 + 7 ) - (2 + 3 + 8 + 9 + 10 + 11 + 12 + 13 + 14 + 15) = -74 ]
Run Code Online (Sandbox Code Playgroud)
代码:
public static int diff(Node n) {
if (n == null)
return 0;
return Sum(n) - Sum(n.left) - Sum(n.right);
}
public static int Sum(Node root) { …Run Code Online (Sandbox Code Playgroud) 在Scala中我发生了一些奇怪的事情.我正在尝试使用第三方库
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream
并获得No ClassTag异常.我模拟下面的场景,因为人们可以将Util视为第三方库.为什么会这样?
object Util {
def fun1[K: ClassTag, M: ClassTag, KD: ClassTag, MD: ClassTag]: Unit = {
println("In function version 2")
}
}
class ClassTagIssue[K, M, KD, MD] {
def build: Unit = {
Util.fun1[K, M, KD, MD]
}
}
object ClassTagIssue {
def main(args: Array[String]) {
new ClassTagIssue[String, String, String, String]().build
}
}
Run Code Online (Sandbox Code Playgroud)
我正在尝试运行此代码并获得以下异常
Error:(23, 14) No ClassTag available for K
Util.fun1[K, M, KD, MD]
^
Error:(23, 14) not enough arguments for method fun1: (implicit evidence$1: scala.reflect.ClassTag[K], …Run Code Online (Sandbox Code Playgroud) 流运行一定时间后给出以下错误?我找不到谁负责创建 .sst 文件?
环境:
卡夫卡版本 0.10.0-cp1
Scala 2.11.8
org.apache.kafka.streams.errors.ProcessorStateException: Error while executing flush from store agg
at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:424)
at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:414)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:165)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:330)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:247)
at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446)
at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.rocksdb.RocksDBException: IO error: /tmp/kafka-streams/pos/0_0/rocksdb/agg/000008.sst: No such file or directory
at org.rocksdb.RocksDB.flush(Native Method)
at org.rocksdb.RocksDB.flush(RocksDB.java:1329)
at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:422)
... 9 more
[2016-06-24 11:13:54,910] ERROR Failed to commit StreamTask #0_0 in thread [StreamThread-1]: (org.apache.kafka.streams.processor.internals.StreamThread:452)
org.apache.kafka.streams.errors.ProcessorStateException: Error while batch writing to store agg
at org.apache.kafka.streams.state.internals.RocksDBStore.putAllInternal(RocksDBStore.java:324)
at …Run Code Online (Sandbox Code Playgroud)