我们目前遇到的问题是,在YARN上运行时,Spark作业看到许多容器因超出内存限制而被杀死.
16/11/18 17:58:52 WARN TaskSetManager: Lost task 53.0 in stage 49.0 (TID 32715, XXXXXXXXXX):
ExecutorLostFailure (executor 23 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 12.4 GB of 12 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
Run Code Online (Sandbox Code Playgroud)
以下参数通过spark-submit传递:
--executor-memory=6G
--driver-memory=4G
--conf "spark.yarn.executor.memoryOverhead=6G"`
Run Code Online (Sandbox Code Playgroud)
我正在使用Spark 2.0.1.
在阅读了几篇关于YARN杀死容器的帖子之后,我们已经将memoryOverhead增加到了这个值(例如,如何避免Spark执行器丢失和纱线容器由于内存限制而将其杀死?).
鉴于我的参数和日志消息,它似乎"当内存使用量大于(executor-memory + executor.memoryOverhead)时,Yarn会终止执行程序".
继续增加这种开销是不切实际的,希望最终我们找到一个不会发生这些错误的值.我们在几个不同的工作中看到了这个问题.我将不胜感激任何关于我应该改变的参数的建议,我应该检查的东西,我应该开始寻找调试这个等等.我能够提供进一步的配置选项等.
我希望能够将Scala函数用作PySpark中的UDF
package com.test
object ScalaPySparkUDFs extends Serializable {
def testFunction1(x: Int): Int = { x * 2 }
def testUDFFunction1 = udf { x: Int => testFunction1(x) }
}
Run Code Online (Sandbox Code Playgroud)
我可以testFunction1在PySpark 中访问它并返回值:
functions = sc._jvm.com.test.ScalaPySparkUDFs
functions.testFunction1(10)
Run Code Online (Sandbox Code Playgroud)
我想要做的就是将此函数用作UDF,最好是在withColumn通话中使用:
row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", testUDFFunction1(numbers['Value']))
Run Code Online (Sandbox Code Playgroud)
我认为这里有一个很有前途的方法: Spark:如何用Scala或Java用户定义函数映射Python?
但是,在对其中的代码进行更改时,可以改为使用testUDFFunction1:
def udf_test(col):
sc = SparkContext._active_spark_context
_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1.apply
return Column(_f(_to_seq(sc, [col], _to_java_column)))
Run Code Online (Sandbox Code Playgroud)
我得到:
AttributeError: 'JavaMember' object has no attribute 'apply'
Run Code Online (Sandbox Code Playgroud)
我不明白这是因为我相信testUDFFunction1有申请方法吗?
我不想使用在这里找到的类型的表达式:将 UDF从Scala注册到SqlContext,以便在PySpark中使用
任何有关如何使这项工作的建议,将不胜感激!
我试图将结构的字段"提升"到数据框中的顶层,如下例所示:
case class A(a1: String, a2: String)
case class B(b1: String, b2: A)
val df = Seq(B("X",A("Y","Z"))).toDF
df.show
+---+-----+
| b1| b2|
+---+-----+
| X|[Y,Z]|
+---+-----+
df.printSchema
root
|-- b1: string (nullable = true)
|-- b2: struct (nullable = true)
| |-- a1: string (nullable = true)
| |-- a2: string (nullable = true)
val lifted = df.withColumn("a1", $"b2.a1").withColumn("a2", $"b2.a2").drop("b2")
lifted.show
+---+---+---+
| b1| a1| a2|
+---+---+---+
| X| Y| Z|
+---+---+---+
lifted.printSchema
root
|-- b1: string (nullable = true) …Run Code Online (Sandbox Code Playgroud) 我使用 Spark 2.1。
如果我运行以下示例:
val seq = Seq((123,"2016-01-01","1"),(123,"2016-01-02","2"),(123,"2016-01-03","3"))
val df = seq.toDF("id","date","score")
val dfAgg = df.sort("id","date").groupBy("id").agg(last("score"))
dfAgg.show
dfAgg.show
dfAgg.show
dfAgg.show
dfAgg.show
Run Code Online (Sandbox Code Playgroud)
上面代码的输出是:
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 1|
+---+------------------+
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 2|
+---+------------------+
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 1|
+---+------------------+
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 3|
+---+------------------+
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 3|
+---+------------------+
Run Code Online (Sandbox Code Playgroud)
目的是获取与每个 id 的最新日期相关的分数:
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 3|
+---+------------------+
Run Code Online (Sandbox Code Playgroud)
但这显然不起作用,因为结果是不确定的。我们是否必须使用窗口函数来实现这一点?
如何使用 .toDF 从序列中创建包含空值的数据帧?
这有效:
val df = Seq((1,"a"),(2,"b")).toDF("number","letter")
Run Code Online (Sandbox Code Playgroud)
但我想做一些类似的事情:
val df = Seq((1, NULL),(2,"b")).toDF("number","letter")
Run Code Online (Sandbox Code Playgroud) 我正在尝试获得我可以使用 Rust 读取和写入“大”CSV 文件的最大速度的指示性度量。
我有一个包含 1 亿行相同行的测试 CSV 文件:
SomeLongStringForTesting1, SomeLongStringForTesting2
该文件在磁盘上的大小为 4.84GB。
我已经编写了(大部分是复制的!)以下使用csv: 1.1.3crate 的代码:
use std::error::Error;
fn main() {
read_and_write("C:/Dev/100MillionRows.csv", "C:/Dev/100MillionRowsCopy.csv").unwrap();
}
fn read_and_write(in_file_path: &str, out_file_path: &str) -> Result<(), Box<Error>> {
let mut rdr = csv::ReaderBuilder::new()
.has_headers(false)
.from_path(in_file_path)?;
let mut wtr = csv::WriterBuilder::new()
.from_path(out_file_path)?;
for result in rdr.records() {
let record = result?;
wtr.write_record(record.iter())?;
}
wtr.flush()?;
Ok(())
}
Run Code Online (Sandbox Code Playgroud)
以“发布模式”构建,然后使用以下命令运行:
powershell -Command "Measure-Command {.\target\release\csv-performance.exe}"72.79 seconds, 71.01 seconds, 70.77 seconds三个运行的产量。
大致说来,我在 70 秒内看到 10GB(组合读写)的 IO,相当于 142MB/S。这大约是 …