小编use*_*459的帖子

SPARK:YARN因超出内存限制而杀死容器

我们目前遇到的问题是,在YARN上运行时,Spark作业看到许多容器因超出内存限制而被杀死.

16/11/18 17:58:52 WARN TaskSetManager: Lost task 53.0 in stage 49.0 (TID 32715, XXXXXXXXXX): 
  ExecutorLostFailure (executor 23 exited caused by one of the running tasks) 
  Reason: Container killed by YARN for exceeding memory limits. 12.4 GB of 12 GB physical memory used. 
    Consider boosting spark.yarn.executor.memoryOverhead.
Run Code Online (Sandbox Code Playgroud)

以下参数通过spark-submit传递:

--executor-memory=6G
--driver-memory=4G
--conf "spark.yarn.executor.memoryOverhead=6G"`
Run Code Online (Sandbox Code Playgroud)

我正在使用Spark 2.0.1.

在阅读了几篇关于YARN杀死容器的帖子之后,我们已经将memoryOverhead增加到了这个值(例如,如何避免Spark执行器丢失和纱线容器由于内存限制而将其杀死?).

鉴于我的参数和日志消息,它似乎"当内存使用量大于(executor-memory + executor.memoryOverhead)时,Yarn会终止执行程序".

继续增加这种开销是不切实际的,希望最终我们找到一个不会发生这些错误的值.我们在几个不同的工作中看到了这个问题.我将不胜感激任何关于我应该改变的参数的建议,我应该检查的东西,我应该开始寻找调试这个等等.我能够提供进一步的配置选项等.

hadoop-yarn apache-spark

6
推荐指数
1
解决办法
7679
查看次数

如何在PySpark中使用Scala UDF?

我希望能够将Scala函数用作PySpark中的UDF

package com.test

object ScalaPySparkUDFs extends Serializable {
    def testFunction1(x: Int): Int = { x * 2 }
    def testUDFFunction1 = udf { x: Int => testFunction1(x) }
} 
Run Code Online (Sandbox Code Playgroud)

我可以testFunction1在PySpark 中访问它并返回值:

functions = sc._jvm.com.test.ScalaPySparkUDFs 
functions.testFunction1(10)
Run Code Online (Sandbox Code Playgroud)

我想要做的就是将此函数用作UDF,最好是在withColumn通话中使用:

row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", testUDFFunction1(numbers['Value']))
Run Code Online (Sandbox Code Playgroud)

我认为这里有一个很有前途的方法: Spark:如何用Scala或Java用户定义函数映射Python?

但是,在对其中的代码进行更改时,可以改为使用testUDFFunction1

def udf_test(col):
    sc = SparkContext._active_spark_context
    _f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1.apply
    return Column(_f(_to_seq(sc, [col], _to_java_column)))
Run Code Online (Sandbox Code Playgroud)

我得到:

 AttributeError: 'JavaMember' object has no attribute 'apply' 
Run Code Online (Sandbox Code Playgroud)

我不明白这是因为我相信testUDFFunction1有申请方法吗?

我不想使用在这里找到的类型的表达式:将 UDF从Scala注册到SqlContext,以便在PySpark中使用

任何有关如何使这项工作的建议,将不胜感激!

python scala apache-spark apache-spark-sql pyspark

6
推荐指数
2
解决办法
3331
查看次数

访问struct Spark SQL中字段的名称

我试图将结构的字段"提升"到数据框中的顶层,如下例所示:

case class A(a1: String, a2: String)
case class B(b1: String, b2: A)

val df = Seq(B("X",A("Y","Z"))).toDF

df.show    
+---+-----+
| b1|   b2|
+---+-----+
|  X|[Y,Z]|
+---+-----+

df.printSchema
root
 |-- b1: string (nullable = true)
 |-- b2: struct (nullable = true)
 |    |-- a1: string (nullable = true)
 |    |-- a2: string (nullable = true)

val lifted = df.withColumn("a1", $"b2.a1").withColumn("a2", $"b2.a2").drop("b2")

lifted.show
+---+---+---+
| b1| a1| a2|
+---+---+---+
|  X|  Y|  Z|
+---+---+---+

lifted.printSchema
 root
 |-- b1: string (nullable = true) …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

4
推荐指数
1
解决办法
5080
查看次数

GroupBy 和聚合未保留 Spark SQL 排序顺序?

我使用 Spark 2.1。

如果我运行以下示例:

val seq = Seq((123,"2016-01-01","1"),(123,"2016-01-02","2"),(123,"2016-01-03","3"))

val df = seq.toDF("id","date","score")

val dfAgg = df.sort("id","date").groupBy("id").agg(last("score"))

dfAgg.show
dfAgg.show
dfAgg.show
dfAgg.show
dfAgg.show
Run Code Online (Sandbox Code Playgroud)

上面代码的输出是:

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 1|
+---+------------------+

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 2|
+---+------------------+

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 1|
+---+------------------+

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 3|
+---+------------------+

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 3|
+---+------------------+
Run Code Online (Sandbox Code Playgroud)

目的是获取与每个 id 的最新日期相关的分数:

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 3|
+---+------------------+ 
Run Code Online (Sandbox Code Playgroud)

但这显然不起作用,因为结果是不确定的。我们是否必须使用窗口函数来实现这一点?

scala apache-spark apache-spark-sql

4
推荐指数
1
解决办法
2677
查看次数

如何使用toDF创建带有空值的DataFrame?

如何使用 .toDF 从序列中创建包含空值的数据帧?

这有效:

val df = Seq((1,"a"),(2,"b")).toDF("number","letter")
Run Code Online (Sandbox Code Playgroud)

但我想做一些类似的事情:

val df = Seq((1, NULL),(2,"b")).toDF("number","letter")
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

4
推荐指数
2
解决办法
2816
查看次数

Rust:读写 CSV 性能

我正在尝试获得我可以使用 Rust 读取和写入“大”CSV 文件的最大速度的指示性度量。

我有一个包含 1 亿行相同行的测试 CSV 文件:

SomeLongStringForTesting1, SomeLongStringForTesting2

该文件在磁盘上的大小为 4.84GB。

我已经编写了(大部分是复制的!)以下使用csv: 1.1.3crate 的代码:

use std::error::Error;

fn main() {
    read_and_write("C:/Dev/100MillionRows.csv", "C:/Dev/100MillionRowsCopy.csv").unwrap();
}

fn read_and_write(in_file_path: &str, out_file_path: &str) -> Result<(), Box<Error>> {
    let mut rdr = csv::ReaderBuilder::new()
        .has_headers(false)
        .from_path(in_file_path)?;

    let mut wtr = csv::WriterBuilder::new()
        .from_path(out_file_path)?;

    for result in rdr.records() {
        let record = result?;
        wtr.write_record(record.iter())?;
    }

    wtr.flush()?;

    Ok(())
}
Run Code Online (Sandbox Code Playgroud)

以“发布模式”构建,然后使用以下命令运行:

powershell -Command "Measure-Command {.\target\release\csv-performance.exe}"72.79 seconds, 71.01 seconds, 70.77 seconds三个运行的产量。

大致说来,我在 70 秒内看到 10GB(组合读写)的 IO,相当于 142MB/S。这大约是 …

csv rust

1
推荐指数
1
解决办法
770
查看次数