小编Pra*_*ant的帖子

如何在 Spark 应用程序中进行有效的日志记录

我有一个用 Scala 编写的 Spark 应用程序代码,它运行一系列 Spark-SQL 语句。这些结果是通过最后针对最终数据帧调用“计数”操作来计算的。我想知道在 Spark-scala 应用程序作业中进行日志记录的最佳方法是什么?由于所有数据帧(大约 20 个)最终都是使用单个操作计算的,因此在记录某些语句的输出/序列/成功时我有哪些选择。

问题本质上不太通用。由于spark采用惰性评估,执行计划由spark决定,我想知道应用程序语句成功运行到什么点以及该阶段的中间结果是什么。

这里的目的是监视长时间运行的任务,看看到什么时候它是好的以及问题出现在哪里。

如果我们尝试将日志记录放在转换之前/之后,那么在读取代码时就会打印它。因此,在实际执行期间必须使用自定义消息来完成日志记录(在 scala 代码末尾调用操作)。如果我尝试在代码之间放置 count/take/first 等,那么作业的执行速度会减慢很多。

scala apache-spark apache-spark-sql

6
推荐指数
1
解决办法
8857
查看次数

如何在特定偏移量到特定偏移量中使用来自 kafka 主题的数据?

我需要消耗特定的偏移量到特定的结束偏移量!!consumer.seek() 从特定偏移量读取数据,但我需要将数据从偏移量检索到 tooffset !!任何帮助将不胜感激,提前致谢。

    ConsumerRecords<String, String> records = consumer.poll(100);
    if(flag) {
        consumer.seek(new TopicPartition("topic-1", 0), 90);
        flag = false;
    }
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api

5
推荐指数
1
解决办法
4328
查看次数

Spark中执行器和容器的区别

我试图清楚地了解纱线管理集群中的内存分配是如何发生的。我知道有一堆执行器(一个执行器有自己的 JVM),并且一个执行器在执行期间可以有一个或多个 vcore。

我试图将这种理解结合在 YARN 配置中,其中事物被隔离为容器。每个容器实际上是一些 Vcor​​e 和堆内存的一部分的混合。

有人可以确认一位执行者是否获得一个容器,或者一位执行者可以拥有多个容器吗?我阅读了 Cloudera 上有关 YARN 内存管理的一些文档,它似乎说 Container 有一个分配给它的 Executor。

Cloudera内存管理

hadoop-yarn apache-spark

3
推荐指数
1
解决办法
2799
查看次数

新插入的 Hive 记录不会显示在 Spark Shell 的 Spark 会话中

我运行了一个简单的 Spark-sql 程序,使用 Spark-SQL 将数据从 Hive 获取到 Spark 会话。

scala> spark.sql("select count(1) from firsthivestreamtable").show(100,false)
+--------+
|count(1)|
+--------+
|36      |
+--------+
Run Code Online (Sandbox Code Playgroud)

Ran insert 语句在 Hive 表中插入 9 条新记录(直接在 Hive 控制台上)。验证 Hive 表是否已正确插入其他行。

hive> select count(1) aa from firsthivestreamtable;
Total MapReduce CPU Time Spent: 4 seconds 520 msec
OK
45
Time taken: 22.173 seconds, Fetched: 1 row(s)
hive>
Run Code Online (Sandbox Code Playgroud)

但是已经打开的 Spark 会话不显示新插入的 9 行。因此,当我在 Spark 会话中进行计数时,它仍然显示 36 行。为什么会发生这种情况?

scala> spark.sql("select count(1) from firsthivestreamtable").show(100,false)
+--------+
|count(1)|
+--------+
|36      |
+--------+
Run Code Online (Sandbox Code Playgroud)

需要在 Spark 会话中执行哪些操作才能将刷新的(新)数据获取到会话中?由于已插入新数据,Hive 表中的实际行数现在为 …

hive apache-spark

2
推荐指数
1
解决办法
1326
查看次数

Spark SQL 中 Group By 子句的底层实现

Spark SQL中Group By子句的底层实现是什么?据我所知,Spark支持两种类型的Group by操作,即GroupByKey和ReduceByKey。ReduceByKey 是一种映射端缩减,提供比 GroupByKey 更好的性能。

在我们的应用程序代码中,我们在 Spark Dataframes 上使用 Spark SQL,并且不直接创建 RDD。所以,我想到了这个问题:Spark SQL 中的 GroupBy 是否执行 GroupByKey、ReduceByKey 或其他操作。

apache-spark apache-spark-sql

2
推荐指数
1
解决办法
1567
查看次数