小编the*_*tom的帖子

小文件的HDFS性能

我是Haddoop的新手.最近我正在尝试处理(仅读取)hdfs/hadoop上的许多文件.平均文件大小约为1 kb,文件数超过10M.由于某些限制,该程序必须用C++编写.

这只是一个性能评估,所以我只使用5台机器作为数据节点.每个数据节点都有5个数据磁盘.

我写了一个小的C++项目来直接从硬盘(而不是从HDFS)读取文件来构建性能基线.该程序将为每个磁盘创建4个读取线程.性能结果是每个磁盘大约有14MB/s.总吞吐量约为14MB/s*5*5 = 350MB/s(14MB/s*5个磁盘*5台机器).

但是,当这个程序(仍然使用C++,动态链接到libhdfs.so,创建4*5*5 = 100个线程)从hdfs集群中读取文件时,吞吐量大约只有55MB/s.

如果在mapreduce中触发此编程(hadoop流,5个作业,每个具有20个线程,总线程数仍为100),则吞吐量降至约45MB/s.(我想通过一些记账过程会减慢速度).

我想知道HDFS可以提供​​什么样的合理性能.如您所见,与本机代码相比,数据吞吐量仅为1/7左右.这是我配置的问题吗?还是HDFS限制?还是Java限制?什么是我的场景的最佳方式?将序列文件帮助(多)?与我们可以预期的本机IO读取相比,合理的吞吐量是多少?

这是我的一些配置:

NameNode堆大小为32G.

作业/任务节点堆大小为8G.

NameNode处理程序数:128

DataNode处理程序数:8

DataNode最大传输线程数:4096

1GBps以太网.

谢谢.

io performance hadoop hdfs

7
推荐指数
2
解决办法
9791
查看次数

这些指标对于 Spark 结构化流意味着什么?

spark.streams.addListener(new StreamingQueryListener() {\n    ......\n    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {\n        println("Query made progress: " + queryProgress.progress)\n    }\n    ......\n})\n
Run Code Online (Sandbox Code Playgroud)\n\n

当 StreamingQueryListener 添加到 Spark Structured Streaming 会话并连续输出 queryProgress 时,您将获得的指标之一是urationMs

\n\n
Query made progress: {\n  ......\n  "durationMs" : {\n    "addBatch" : 159136,\n    "getBatch" : 0,\n    "getEndOffset" : 0,\n    "queryPlanning" : 38,\n    "setOffsetRange" : 14,\n    "triggerExecution" : 159518,\n    "walCommit" : 182\n  }\n  ......\n}\xe2\x80\x8b\n
Run Code Online (Sandbox Code Playgroud)\n\n

谁能告诉我durationMs中的那些子指标在spark上下文中意味着什么?例如“addBatch 159136”是什么意思。

\n

apache-spark spark-structured-streaming

7
推荐指数
1
解决办法
1571
查看次数

Spark:为每个节点创建多个执行程序的优势是什么?

我在AWS-EMR集群上运行我的工作.它是使用cr1.8xlarge实例的40节点集群.每个cr1.8xlarge有240G内存和32个内核.我可以使用以下配置运行:

--driver-memory 180g --driver-cores 26 --executor-memory 180g --executor-cores 26 --num-executors 40 --conf spark.default.parallelism=4000
Run Code Online (Sandbox Code Playgroud)

要么

--driver-memory 180g --driver-cores 26 --executor-memory 90g --executor-cores 13 --num-executors 80 --conf spark.default.parallelism=4000
Run Code Online (Sandbox Code Playgroud)

从作业跟踪器网站开始,同时运行的任务数量主要只是可用的核心数(cpu).所以我想知道我们是否希望每个节点有多个执行程序?

谢谢!

memory executor apache-spark

6
推荐指数
1
解决办法
5170
查看次数

如何将分组按功能转换为数据框

嗨,我是 Scala 和 Spark 的新手。我正在尝试通过 spark sql 分组。当我尝试保存或查看输出时,它会引发以下错误。

value coalesce is not a member of org.apache.spark.sql.RelationalGroupedDataset
Run Code Online (Sandbox Code Playgroud)

这是我的代码。

 val fp = filtertable.select($"_1", $"_2", $"_3",$"_4").groupBy("_1", "_2","_3")
 fp.show() // throws error
 fp.coalesce(1).write.format("csv").save("file://" + test.toString()) //throws error.
Run Code Online (Sandbox Code Playgroud)

任何帮助将不胜感激。

scala apache-spark apache-spark-sql

6
推荐指数
1
解决办法
5190
查看次数

Spark高效的groupby操作-重新分区?

我正在 pyspark 2.3 中工作,我正在尝试找出从数据框中获取一些聚合统计信息的最有效方法。

我有一个包含 15 亿条记录的数据框,分布在一个由 10 个节点组成的相对较小的集群中。每个都有 16GB 内存和 4 个核心。我的复制因子设置为 2。

我的数据框可能有 15 列,它们是数据类型的混合,但我只对两列感兴趣 - ID 和 eventDate。我想运行的代码非常简单:

output = df.groupby(['ID']).agg(F.min('eventDate').alias("firstDate"),F.max('eventDate').alias("lastDate"))
output.write.parquet('hdfs:///somewhere/dateFile.parquet',mode='overwrite')
Run Code Online (Sandbox Code Playgroud)

我试图找出执行此操作的最有效方法。ID(我要分组的字段)有 12m 个值,df.rdd.getNumPartitions() 目前为 642。

我最好先将数据框投影到我想要的两列吗?有这么多 ID,我应该先重新分区我的数据集吗?我应该删除重复项吗?我可以在我的 groupby 之前运行这样的事情:

df = df[['ID','eventDate']].drop_duplicates().repartition(x)
Run Code Online (Sandbox Code Playgroud)

或者

df = df[['ID','eventDate']].repartition(x)
Run Code Online (Sandbox Code Playgroud)

我正在努力找出什么可以优化运行时。任何有关预先确定运行时间的指导将不胜感激。如果可能的话,我不想只是“测试一下”,因为我有几个这样的查询要运行,每个查询都需要一段时间。

python apache-spark pyspark

6
推荐指数
1
解决办法
1万
查看次数

为单个 Action Spark 应用程序缓存数据帧(在该应用程序中多次引用该数据帧)是否有效?

我对 Spark 的缓存机制有点困惑。

假设我有一个 Spark 应用程序,在多次转换结束时只有一个操作。假设我有一个数据帧 A,并对其应用了 2-3 次转换,创建了多个数据帧,这最终有助于创建将保存到磁盘的最后一个数据帧。

例子 :

val A=spark.read() // large size
val B=A.map()
val C=A.map()
.
.
.
val D=B.join(C)
D.save()
Run Code Online (Sandbox Code Playgroud)

那么我是否需要缓存数据帧 A 以增强性能?

提前致谢。

apache-spark apache-spark-sql

6
推荐指数
1
解决办法
1584
查看次数

Spark 如何实现任务间的内存公平?

我正在观看有关火花内存管理的演示文稿

他谈到了他们如何在一个执行器中实现不同任务之间的公平性(12:00)。他提出了任务之间动态分配的想法,并宣称如果更多任务开始执行,Spark 会将其他任务的页面溢出到磁盘。

我之前读过 Spark 中的任务本质上是线程,而在 Java 中,我们没有这种能力来管理线程的内存并在它们之间建立内存公平性。我想知道 Spark 是如何实现这一目标的?

java multithreading memory-management apache-spark

6
推荐指数
1
解决办法
293
查看次数

Databricks DELTA CTAS 与使用 %sql 的 LOCATION

DELTA不具有CREATE TABLE LIKE。它确实有CTAS

我只想复制表的定义LOCATION,但还要指定.

例如,这不起作用:

CREATE TABLE IF NOT EXISTS NEW_CUSTOMER_FEED 
AS SELECT * from NEW_CUSTOMER_FEED WHERE 1 = 0 
LOCATION '/atRest/data'
Run Code Online (Sandbox Code Playgroud)

我缺少什么?

databricks delta-lake databricks-sql

6
推荐指数
1
解决办法
3811
查看次数

将数据从 Oracle SQL Developer 导出到 Excel .xlsx

我有一个小项目,需要将 Oracle SQL Developer 中的数据导出到 Excel(使用命令而不是 SLQ Developer 中的工具),然后创建一个图表。使用“假脱机”我可以导出到 csv 很好(但不能在 csv 中制作图形)但是当我尝试导出到 xlsx 时它会破坏整个 excel 表说

"Excel cannot open the file "ExcelFile.xlsx" because the file format or file extention 
       is not valid. Verify that the file has not been corrupted and that the 
       file extension mathces the format of the file."
Run Code Online (Sandbox Code Playgroud)

这是我在 SQL Developer 中使用的代码。

spool FileLocation\ExcelFile.xlsm
SELECT * FROM Table;
spool off;
Run Code Online (Sandbox Code Playgroud)

有什么方法可以阻止数据损坏,还是有其他方法可以将数据导出到 .xlsx 文件?

sql oracle excel xlsx

5
推荐指数
2
解决办法
3万
查看次数

Spark SQL 使用 foldLeft 和 withColumn 替代 groupby/pivot/agg/collect_list 以提高性能

我有一个由三列组成的 Spark DataFrame:

 id | col1 | col2 
-----------------
 x  |  p1  |  a1  
-----------------
 x  |  p2  |  b1
-----------------
 y  |  p2  |  b2
-----------------
 y  |  p2  |  b3
-----------------
 y  |  p3  |  c1
Run Code Online (Sandbox Code Playgroud)

申请后,df.groupBy("id").pivot("col1").agg(collect_list("col2"))我得到以下数据帧(aggDF):

+---+----+--------+----+
| id|  p1|      p2|  p3|
+---+----+--------+----+
|  x|[a1]|    [b1]|  []|
|  y|  []|[b2, b3]|[c1]|
+---+----+--------+----+
Run Code Online (Sandbox Code Playgroud)

然后我找到除了列之外的id列的名称。

val cols = aggDF.columns.filter(x => x != "id")
Run Code Online (Sandbox Code Playgroud)

之后我cols.foldLeft(aggDF)((df, x) => df.withColumn(x, when(size(col(x)) > 0, col(x)).otherwise(lit(null))))null. …

apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
1558
查看次数