小编Ela*_*dor的帖子

由于内存泄漏导致Spark执行器崩溃

当运行使用来自kafka主题100个分区的数据的spark流媒体应用程序,并且每个执行程序运行10个执行程序,5个核心和20GB RAM时,执行程序将崩溃并显示以下日志:

ERROR ResourceLeakDetector:泄漏:ByteBuf.release()是垃圾收集之前,不叫.启用高级泄漏报告以找出泄漏发生的位置.

ERROR YarnClusterScheduler:在worker23.oct.com上丢失执行者18:奴隶丢失了

ERROR ApplicationMaster:收到的信号期限

此异常出现在spark JIRA中:

https://issues.apache.org/jira/browse/SPARK-17380

有人在升级到spark 2.0.2后写道,问题解决了.但是我们使用spark 2.1作为HDP 2.6的一部分.所以我猜这个bug在火花2.1中没有解决.

还有人遇到过这个bug,并在spark用户列表中写过但没有得到答案:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Receiver-Resource-Leak-td27857.html

顺便说一句 - 流媒体应用程序没有调用cache()persist(),因此不涉及任何缓存.

有没有人遇到过崩溃的流媒体应用?

out-of-memory netty spark-streaming apache-spark-2.0

12
推荐指数
0
解决办法
731
查看次数

配置Airflow以使用CeleryExecutor

我尝试配置Airbnb AirFlow以使用CeleryExecutor,如下所示:

executer将airflow.cfg中的更改SequentialExecutorCeleryExecutor:

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor
Run Code Online (Sandbox Code Playgroud)

但是我收到以下错误:

airflow.configuration.AirflowConfigException: error: cannot use sqlite with the CeleryExecutor
Run Code Online (Sandbox Code Playgroud)

请注意,sql_alchemy_conn配置如下:

sql_alchemy_conn = sqlite:////root/airflow/airflow.db
Run Code Online (Sandbox Code Playgroud)

我查看了Airflow的GIT(https://github.com/airbnb/airflow/blob/master/airflow/configuration.py)

并发现以下代码抛出此异常:

def _validate(self):
        if (
                self.get("core", "executor") != 'SequentialExecutor' and
                "sqlite" in self.get('core', 'sql_alchemy_conn')):
            raise AirflowConfigException("error: cannot use sqlite with the {}".
                format(self.get('core', 'executor')))
Run Code Online (Sandbox Code Playgroud)

从这种validate方法看来,sql_alchemy_conn不能包含sqlite.

你知道如何配置CeleryExecutor没有sqllite吗?请注意,我根据需要下载了rabitMQ以使用CeleryExecuter.

airflow

10
推荐指数
2
解决办法
2万
查看次数

大量读取期间 kafka 磁盘 hault 写入并导致 kafka 生产者中的“队列已满”错误

我们有 6 个具有 256GB RAM、24c/48T 的 kafka 代理,它们托管在 raid10 中配置的 20 个 1.8TB SAS 10K rpm 磁盘。

有两个火花流应用程序

  • 每 10 分钟开始他们的批次
  • 一旦他们开始,他们的第一份工作就是阅读同一个 kafka 主题。
  • 该主题有 200 个分区,均匀分布在 6 个代理上(每个代理上有 33 个分区)。
  • 流媒体应用程序使用 kafka 客户端 0.8.2.1 从 kafka 消费

有 21 个注入器实例以 6K 事件/秒的速率连续写入该主题。他们使用 librdkafka poroducer 来向 kafka 生成事件。

当流媒体应用程序醒来时,他们的第一份工作是阅读主题。一旦这样做,kafka 磁盘中的 %util 将在 30 秒到 60 秒内变为 90-100%,同时所有注入器实例都从它们的 kafka 生产者那里得到“队列已满”错误。这是生产者配置:

  • queue.buffering.max.kbytes:2097151
  • 逗留时间:0.5

在此处输入图片说明

从该图中看不到,但是在 util% 高的时候,有一段时间写入为 0,我们假设在这些时间注入器的生产者的队列已满,因此抛出“队列已满”错误。

值得一提的是,我们在kafka机器中使用deadline IO调度器,它优先考虑读取操作。

关于如何释放写的压力,我们有几个想法:

  • 减少不必要的 iops - 将 kafka 磁盘配置从 raid10 更改为非raid(“jbod”)
  • 传播阅读 - 使 Spark …

raid apache-kafka spark-streaming sched-deadline

7
推荐指数
0
解决办法
184
查看次数

从火花中的kafka消息中获取主题

在我们的火花流媒体工作中,我们从kafka中读取流媒体中的消息.

为此,我们使用KafkaUtils.createDirectStream返回的API JavaPairInputDStreamfrom.

消息是通过以下方式从kafka(来自三个主题 - test1,test2,test3)读取的:

private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));

HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);

JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
                streamingContext,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
                );
Run Code Online (Sandbox Code Playgroud)

我们希望以不同的方式处理来自每个主题的消息,为了实现这一点,我们需要知道每条消息的主题名称.

所以我们做了以下事情:

JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
Run Code Online (Sandbox Code Playgroud)

这是执行SplitToLinesFunction:

public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
    @Override
    public String call(Tuple2<String, String> tuple2) 
    {
        System.out.println(tuple2._1);
        return tuple2._2();
    }
}
Run Code Online (Sandbox Code Playgroud)

问题是,它tuple2._1是null,我们假设tuple2._1它将包含一些元数据,例如消息来自的主题/分区的名称.

但是,当我们打印时tuple2._1,它是空的. …

apache-kafka apache-spark spark-streaming

5
推荐指数
1
解决办法
4246
查看次数

在 Spark 2 中使用 DataSet.repartition - 多个任务处理多个分区

我们有一个 spark 流应用程序(spark 2.1 在 Hortonworks 2.6 上运行)并使用DataSet.repartition(在DataSet<Row>从 Kafka 读取的a上)DataSet<Row>'s根据给定的列(称为block_id)重新分区分区。

我们从一个DataSet<Row>包含 50 个分区的分区开始,最后(在调用 之后DataSet.repartition)分区数量等于唯一block_id 的数量。

问题是它的DataSet.repartition行为不像我们预期的那样——当我们查看运行 的 spark 作业的事件时间线时repartition,我们看到有几个任务处理 1 个block_id,而处理 2 个block_id甚至 3 或 4 个block_id 的任务较少。

似乎可以DataSet.repartition确保所有Rows具有相同block_id 的都在单个分区内,但并不是每个创建分区的任务都只处理一个 block_id。

结果是重新分区作业(在流应用程序内部运行)花费的时间与其最长的任务(处理最多block_id 的任务)一样多。

我们尝试使用提供给流媒体应用程序的 Vcor​​es 数量 - 从 10 到 25 到 50(我们在从 Kafka 读取的原始 RDD 中有 50 …

apache-spark spark-streaming apache-spark-dataset

5
推荐指数
1
解决办法
3123
查看次数