我试图使用Spark Streaming与Kafka(版本1.1.0)但由于此错误,Spark作业不断崩溃:
14/11/21 12:39:23 ERROR TaskSetManager: Task 3967.0:0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用eclipse(使用maven conf)和2个worker来执行下面的代码,每个都有2个核心或者也尝试过spark-submit.
public class StreamingWorkCount implements Serializable {
public static void main(String[] args) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
JavaStreamingContext jssc = new JavaStreamingContext(
"spark://192.168.1.19:7077", "JavaWordCount",
new Duration(1000));
JavaDStream<String> trainingData = jssc.textFileStream(
"/home/bdi-user/kaushal-drive/spark/data/training").cache();
trainingData.foreach(new Function<JavaRDD<String>, Void>() {
public Void call(JavaRDD<String> rdd) throws Exception {
List<String> output = rdd.collect();
System.out.println("Sentences Collected from files " + output);
return null;
}
});
trainingData.print();
jssc.start();
jssc.awaitTermination();
}
}
Run Code Online (Sandbox Code Playgroud)
并记录该代码
15/01/22 21:57:13 INFO FileInputDStream: New files at time 1421944033000 ms:
15/01/22 21:57:13 INFO JobScheduler: Added jobs for time 1421944033000 …Run Code Online (Sandbox Code Playgroud) 我试图了解如何使Spark Streaming应用程序更容错(特别是在尝试写入下游依赖项时),并且我不知道在尝试将结果写入外部源时处理失败的最佳方法是什么,像Cassandra,DynamoDB等.
例如,我有一个Spark Streaming作业从Stream(Kafka,Flume等等)中提取数据......我还没有最终确定使用哪种技术,将类似项目聚合在一起,然后将结果写入外部存储.(即Cassandra,DynamoDB或任何正在接收我的DStream计算结果的东西).
我试图弄清楚如何处理外部依赖关系无法写入的情况.也许集群发生故障,可能存在权限问题等,但我的工作无法将结果写入外部依赖项.有没有办法暂停Spark Streaming以便接收器不会继续批量处理数据?我应该只是睡觉当前批次并让接收器继续存储批次吗?如果问题是暂时的(几秒钟),继续批处理可能是可以接受的,但如果依赖性下降几分钟或1小时以上会发生什么?
我有一个想法是有一个监视进程,在后台监视依赖项的健康状况,如果它发现它"不健康",它将停止工作.然后,当所有依赖项都运行正常时,我可以重新启动作业并处理未写入外部源的所有数据.
我的另一个想法是以某种方式在DStream forEachRdd方法中发出信号,表示存在问题.我可以在DStream中抛出某种异常,它会向驱动程序发出它应该停止的信号吗?
如果有人有任何关于如何处理外部容错的经验,或者可以指向我的好文章/视频,那将是很好的.
谢谢
当Parquet文件data在其date列上写入分区时,我们得到一个目录结构,如:
/data
_common_metadata
_metadata
_SUCCESS
/date=1
part-r-xxx.gzip
part-r-xxx.gzip
/date=2
part-r-xxx.gzip
part-r-xxx.gzip
Run Code Online (Sandbox Code Playgroud)
如果在date=2没有Parquet实用程序(通过shell或文件浏览器等)的情况下删除分区,那么当只有分区时,是否需要回滚任何元数据文件date=1?
或者可以随意删除分区并在以后重写它们(或不重写)?
在Spark Streaming中,可以(并且必须使用有状态操作)将StreamingContext检查点设置为(AND)的可靠数据存储(S3,HDFS,...):
DStream 血统如上所述这里,设置输出数据存储需要调用yourSparkStreamingCtx.checkpoint(datastoreURL)
另一方面,可以DataStream通过调用checkpoint(timeInterval)它们来为每个设置谱系检查点间隔.实际上,建议将谱系检查点间隔设置为DataStream滑动间隔的5到10倍:
dstream.checkpoint(checkpointInterval).通常,DStream的5-10个滑动间隔的检查点间隔是一个很好的设置.
我的问题是:
当流上下文设置为执行检查点并且没有ds.checkpoint(interval)被调用时,是否为所有数据流启用了谱系检查点,默认值checkpointInterval等于batchInterval?或者,相反,只有元数据检查点启用了什么?
有没有使用模式转换方式的Avro从消息卡夫卡与火花到数据帧?用户记录的模式文件:
{
"fields": [
{ "name": "firstName", "type": "string" },
{ "name": "lastName", "type": "string" }
],
"name": "user",
"type": "record"
}
Run Code Online (Sandbox Code Playgroud)
来自SqlNetworkWordCount示例和Kafka,Spark和Avro的代码片段- 第3部分,生成和使用Avro消息来读取消息.
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
...
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show() …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用createDirectStream方法打开Kafka(尝试版本0.11.0.2和1.0.1)流并获取此AbstractMethodError错误:
Exception in thread "main" java.lang.AbstractMethodError
at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at org.apache.spark.streaming.kafka010.KafkaUtils$.initializeLogIfNecessary(KafkaUtils.scala:39)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at org.apache.spark.streaming.kafka010.KafkaUtils$.log(KafkaUtils.scala:39)
at org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
at org.apache.spark.streaming.kafka010.KafkaUtils$.logWarning(KafkaUtils.scala:39)
at org.apache.spark.streaming.kafka010.KafkaUtils$.fixKafkaParams(KafkaUtils.scala:201)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:63)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)
Run Code Online (Sandbox Code Playgroud)
这就是我所说的:
val preferredHosts = LocationStrategies.PreferConsistent
val kafkaParams = Map(
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[IntegerDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest"
)
val aCreatedStream = createDirectStream[String, String](ssc, preferredHosts,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
Run Code Online (Sandbox Code Playgroud)
我有Kafka在9092运行,我能够创建生产者和消费者,并在他们之间传递消息,所以不知道为什么它不能使用Scala代码.任何想法都赞赏.
我正在尝试设置Spark Streaming以从Kafka队列获取消息.我收到以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o30.createDirectStream.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([test-topic,0])
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
Run Code Online (Sandbox Code Playgroud)
这是我正在执行的代码(pyspark):
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test-topic"], {"metadata.broker.list": "host.domain:9092"})
ssc.start()
ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
有几个类似的帖子有相同的错误.在所有情况下,原因都是空的kafka主题.我的"测试主题"中有消息.我可以把它们拿出去
kafka-console-consumer --zookeeper host.domain:2181 --topic test-topic --from-beginning --max-messages 100
Run Code Online (Sandbox Code Playgroud)
有谁知道可能是什么问题?
我正在使用:
我有一些用例,我想更清楚一点,关于Kafka主题分区 - >火花流资源利用率.
我使用spark独立模式,所以我只有"执行器总数"和"执行器内存".据我所知并根据文档,将并行性引入Spark流的方法是使用分区的Kafka主题 - >当我使用spark-kafka直接流集成时,RDD将具有与kafka相同数量的分区.
因此,如果我在主题中有1个分区,并且有1个执行程序核心,那么该核心将从Kafka顺序读取.
如果我有:
主题中有2个分区,只有1个执行器核心?该核心将首先从一个分区读取,然后从第二个分区读取,因此分区主题没有任何好处吗?
主题中有2个分区和2个核心?然后1个执行器核心从1个分区读取,第二个核心从第二个分区读取吗?
1个kafka分区和2个执行器核心?
谢谢.
当运行使用来自kafka主题100个分区的数据的spark流媒体应用程序,并且每个执行程序运行10个执行程序,5个核心和20GB RAM时,执行程序将崩溃并显示以下日志:
ERROR ResourceLeakDetector:泄漏:ByteBuf.release()是垃圾收集之前,不叫.启用高级泄漏报告以找出泄漏发生的位置.
ERROR YarnClusterScheduler:在worker23.oct.com上丢失执行者18:奴隶丢失了
ERROR ApplicationMaster:收到的信号期限
此异常出现在spark JIRA中:
https://issues.apache.org/jira/browse/SPARK-17380
有人在升级到spark 2.0.2后写道,问题解决了.但是我们使用spark 2.1作为HDP 2.6的一部分.所以我猜这个bug在火花2.1中没有解决.
还有人遇到过这个bug,并在spark用户列表中写过但没有得到答案:
顺便说一句 - 流媒体应用程序没有调用cache()或persist(),因此不涉及任何缓存.
有没有人遇到过崩溃的流媒体应用?
spark-streaming ×10
apache-spark ×9
apache-kafka ×4
scala ×2
avro ×1
data-stream ×1
filesystems ×1
netty ×1
parquet ×1