当运行使用来自kafka主题100个分区的数据的spark流媒体应用程序,并且每个执行程序运行10个执行程序,5个核心和20GB RAM时,执行程序将崩溃并显示以下日志:
ERROR ResourceLeakDetector:泄漏:ByteBuf.release()是垃圾收集之前,不叫.启用高级泄漏报告以找出泄漏发生的位置.
ERROR YarnClusterScheduler:在worker23.oct.com上丢失执行者18:奴隶丢失了
ERROR ApplicationMaster:收到的信号期限
此异常出现在spark JIRA中:
https://issues.apache.org/jira/browse/SPARK-17380
有人在升级到spark 2.0.2后写道,问题解决了.但是我们使用spark 2.1作为HDP 2.6的一部分.所以我猜这个bug在火花2.1中没有解决.
还有人遇到过这个bug,并在spark用户列表中写过但没有得到答案:
顺便说一句 - 流媒体应用程序没有调用cache()或persist(),因此不涉及任何缓存.
有没有人遇到过崩溃的流媒体应用?
我尝试配置Airbnb AirFlow以使用CeleryExecutor,如下所示:
我executer将airflow.cfg中的更改SequentialExecutor为CeleryExecutor:
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor
Run Code Online (Sandbox Code Playgroud)
但是我收到以下错误:
airflow.configuration.AirflowConfigException: error: cannot use sqlite with the CeleryExecutor
Run Code Online (Sandbox Code Playgroud)
请注意,sql_alchemy_conn配置如下:
sql_alchemy_conn = sqlite:////root/airflow/airflow.db
Run Code Online (Sandbox Code Playgroud)
我查看了Airflow的GIT(https://github.com/airbnb/airflow/blob/master/airflow/configuration.py)
并发现以下代码抛出此异常:
def _validate(self):
if (
self.get("core", "executor") != 'SequentialExecutor' and
"sqlite" in self.get('core', 'sql_alchemy_conn')):
raise AirflowConfigException("error: cannot use sqlite with the {}".
format(self.get('core', 'executor')))
Run Code Online (Sandbox Code Playgroud)
从这种validate方法看来,sql_alchemy_conn不能包含sqlite.
你知道如何配置CeleryExecutor没有sqllite吗?请注意,我根据需要下载了rabitMQ以使用CeleryExecuter.
我们有 6 个具有 256GB RAM、24c/48T 的 kafka 代理,它们托管在 raid10 中配置的 20 个 1.8TB SAS 10K rpm 磁盘。
有两个火花流应用程序
有 21 个注入器实例以 6K 事件/秒的速率连续写入该主题。他们使用 librdkafka poroducer 来向 kafka 生成事件。
当流媒体应用程序醒来时,他们的第一份工作是阅读主题。一旦这样做,kafka 磁盘中的 %util 将在 30 秒到 60 秒内变为 90-100%,同时所有注入器实例都从它们的 kafka 生产者那里得到“队列已满”错误。这是生产者配置:
从该图中看不到,但是在 util% 高的时候,有一段时间写入为 0,我们假设在这些时间注入器的生产者的队列已满,因此抛出“队列已满”错误。
值得一提的是,我们在kafka机器中使用deadline IO调度器,它优先考虑读取操作。
关于如何释放写的压力,我们有几个想法:
在我们的火花流媒体工作中,我们从kafka中读取流媒体中的消息.
为此,我们使用KafkaUtils.createDirectStream返回的API JavaPairInputDStreamfrom.
消息是通过以下方式从kafka(来自三个主题 - test1,test2,test3)读取的:
private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
Run Code Online (Sandbox Code Playgroud)
我们希望以不同的方式处理来自每个主题的消息,为了实现这一点,我们需要知道每条消息的主题名称.
所以我们做了以下事情:
JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
Run Code Online (Sandbox Code Playgroud)
这是执行SplitToLinesFunction:
public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
@Override
public String call(Tuple2<String, String> tuple2)
{
System.out.println(tuple2._1);
return tuple2._2();
}
}
Run Code Online (Sandbox Code Playgroud)
问题是,它tuple2._1是null,我们假设tuple2._1它将包含一些元数据,例如消息来自的主题/分区的名称.
但是,当我们打印时tuple2._1,它是空的. …
我们有一个 spark 流应用程序(spark 2.1 在 Hortonworks 2.6 上运行)并使用DataSet.repartition(在DataSet<Row>从 Kafka 读取的a上)DataSet<Row>'s根据给定的列(称为block_id)重新分区分区。
我们从一个DataSet<Row>包含 50 个分区的分区开始,最后(在调用 之后DataSet.repartition)分区数量等于唯一block_id 的数量。
问题是它的DataSet.repartition行为不像我们预期的那样——当我们查看运行 的 spark 作业的事件时间线时repartition,我们看到有几个任务处理 1 个block_id,而处理 2 个block_id甚至 3 或 4 个block_id 的任务较少。
似乎可以DataSet.repartition确保所有Rows具有相同block_id 的都在单个分区内,但并不是每个创建分区的任务都只处理一个 block_id。
结果是重新分区作业(在流应用程序内部运行)花费的时间与其最长的任务(处理最多block_id 的任务)一样多。
我们尝试使用提供给流媒体应用程序的 Vcores 数量 - 从 10 到 25 到 50(我们在从 Kafka 读取的原始 RDD 中有 50 …