小编TX *_*Shi的帖子

如何在多个条件下对Spark数据帧执行"查找"操作

我是Spark的新手(我的版本是1.6.0),现在我正在尝试解决下面给出的问题:

假设有两个源文件:

  • 第一个(简称A)是一个大的,包含名为A1,B1,C1和其他80列的列.里面有230K的记录.
  • 第二个(简称B)是一个小的查找表,其中包含名为A2,B2,C2和D2的列.里面有250条记录.

现在我们需要在A中插入一个新列,给出以下逻辑:

  • 首先在B中查找A1,B1和C1(对应的列是A2,B2和C2),如果成功,则返回D2作为新添加列的值.如果没找到......
  • 然后在B中查找A1,B1.如果成功,则返回D2.如果没找到......
  • 设置默认值"NA"

我已经读过文件并将它们转换成数据框.对于第一种情况,我得到的结果是左外连接在一起.但是我在下一步找不到好的方法.

我目前的尝试是通过使用不太严格的条件连接A和B来构建新的数据框架.但是我不知道如何从另一个更新当前数据帧.或者还有其他更直观,更有效的方法来解决整个问题吗?

感谢所有的答案.

-----------------------------更新于20160309 ------------------ --------------

终于接受了@mlk的回答.仍然非常感谢@ zero323对于他/她对UDF和加入的好评,Tungsten代码生成确实是我们现在面临的另一个问题.但是,由于我们需要为每次查找执行大量的查找和平均4个条件,因此前一个解决方案更合适......

最终解决方案在某种程度上看起来像下面的片段:

```
import sqlContext.implicits._
import com.github.marklister.collections.io._

case class TableType(A: String, B: String, C: String, D: String)
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("..."))
val lkupD = udf {
  (aStr: String, bStr: String, cStr: String) =>
    tableBroadcast.value.find {
      case TableType(a, b, c, _) =>
        (a == aStr && b == bStr && c == cStr) ||
        (a == aStr && b == bStr)
    }.getOrElse(TableType("", "", "", "NA")).D
} …
Run Code Online (Sandbox Code Playgroud)

lookup scala dataframe apache-spark apache-spark-sql

7
推荐指数
1
解决办法
6238
查看次数

排序后不会对Spark数据帧进行排序

我正在处理一个JSON文件,使用Spark(版本1.6.1)生成两个JSON文件.输入文件的大小约为30~40G(100M记录).对于生成的文件,较大的文件大约为10G~15G(30M记录),较小的文件大约为500M~750M(1.5M记录).两个结果文件都面临以下问题:

我调用了数据帧的"排序"方法,然后执行"重新分区"将结果合并到一个文件中.然后我检查了生成的文件,在一个间隔中找到了记录的顺序,但整个文件没有全局排序.例如,文件中最后一条记录(行号为1.9M)的密钥(由3列构成)是"(ou7QDj48c,014,075)",但文件中的中间记录的密钥(行号375K)是" (pzwzh5vm8,003,023)"

pzwzh5vm8 003 023
...
ou7QDj48c 014 075
Run Code Online (Sandbox Code Playgroud)

当我使用相对较小的输入源(输入文件400K行)在本地测试代码时,根本不会发生这种情况.

我的具体代码如下所示:

big_json = big_json.sort($"col1", $"col2", $"col3", $"col4")
big_json.repartition(1).write.mode("overwrite").json("filepath")
Run Code Online (Sandbox Code Playgroud)

谁能提出建议?谢谢.

(我也注意到这个线程讨论了类似的问题,但到目前为止还没有一个好的解决方案.如果这种现象真的是由重新分区操作引起的,那么任何人都可以帮助我有效地将数据帧转换为单个JSON文件而不进行转换进入RDD,同时保持排序顺序?谢谢)

方案:

非常感谢@manos @eliasah和@pkrishna的帮助.在阅读了你的评论之后,我曾考虑过使用coalesce,但在调查了它的性能后,我放弃了这个想法.

最终的解决方案是:对数据帧进行排序并写入JSON,无需任何重新分区或合并.完成整个工作后,请调用下面的HDFS命令

hdfs dfs -getmerge /hdfs/file/path/part* ./local.json
Run Code Online (Sandbox Code Playgroud)

这个命令比我的想象要好得多.它既不需要太多时间也不需要太多空间,并且给我一个很好的单个文件.我只是使用headtail在巨大的结果文件,它似乎完全有序.

apache-spark apache-spark-sql

4
推荐指数
1
解决办法
2456
查看次数

如何从保存的 XGBoost 模型中获取参数

我正在尝试使用以下参数训练 XGBoost 模型:

xgb_params = {
    'objective': 'binary:logistic',
    'eval_metric': 'auc',
    'lambda': 0.8,
    'alpha': 0.4,
    'max_depth': 10,
    'max_delta_step': 1,
    'verbose': True
}
Run Code Online (Sandbox Code Playgroud)

由于我的输入数据太大而无法完全加载到内存中,因此我调整了增量训练:

xgb_clf = xgb.train(xgb_params, input_data, num_boost_round=rounds_per_batch,
                    xgb_model=model_path)
Run Code Online (Sandbox Code Playgroud)

预测代码是

xgb_clf = xgb.XGBClassifier()
booster = xgb.Booster()
booster.load_model(model_path)
xgb_clf._Booster = booster
raw_probas = xgb_clf.predict_proba(x)
Run Code Online (Sandbox Code Playgroud)

结果似乎不错。但是当我尝试调用时xgb_clf.get_xgb_params(),我得到了一个 param dict,其中所有参数都设置为默认值。

我可以猜到根本原因是当我初始化模型时,我没有传入任何参数。所以模型是使用默认值初始化的,但是当它预测时,它使用了一个内部助推器,该助推器已经使用一些预拟合定义的参数。

但是,我想知道有什么方法可以在我将预训练的 booster 模型分配给 XGBClassifier 之后,看到用于训练 booster 的真实参数,而不是用于初始化分类器的参数。

python xgboost

4
推荐指数
1
解决办法
1万
查看次数