小编Kai*_*Kai的帖子

如何向Spark DataFrame添加持久的行ID列?

这个问题并不新鲜,但我在Spark中发现了令人惊讶的行为.我需要向DataFrame添加一列行ID.我使用了DataFrame方法monotonically_increasing_id(),它确实给了我一个额外的uniques行ID(顺便说一句,它们不是连续的,但是是唯一的).

我遇到的问题是,当我过滤DataFrame时,重新分配生成的DataFrame中的行ID.两个DataFrame如下所示.

  • 第一个是添加了行ID的初始DataFrame,如下所示:

    df.withColumn("rowId", monotonically_increasing_id()) 
    
    Run Code Online (Sandbox Code Playgroud)
  • 第二个DataFrame是在col P via上过滤后获得的数据帧df.filter(col("P")).

问题由custId 169的rowId说明,在初始DataFrame中为5,但在过滤后,当custId 169被过滤掉时,rowId(5)被重新分配给custmId 773!我不知道为什么这是默认行为.

我希望rowIds它"粘"; 如果我从DataFrame中删除行,我不希望他们的ID"重新使用",我希望它们与行一起消失.有可能吗?我没有看到任何标志从monotonically_increasing_id方法请求此行为.

+---------+--------------------+-------+
| custId  |    features|    P  |rowId|
+---------+--------------------+-------+
|806      |[50,5074,...|   true|    0|
|832      |[45,120,1...|   true|    1|
|216      |[6691,272...|   true|    2|
|926      |[120,1788...|   true|    3|
|875      |[54,120,1...|   true|    4|
|169      |[19406,21...|  false|    5|

after filtering on P:
+---------+--------------------+-------+
|   custId|    features|    P  |rowId|
+---------+--------------------+-------+
|      806|[50,5074,...|   true|    0|
|      832|[45,120,1...|   true|    1|
|      216|[6691,272...|   true|    2|
|      926|[120,1788...|   true|    3|
| …
Run Code Online (Sandbox Code Playgroud)

dataframe apache-spark apache-spark-sql

32
推荐指数
1
解决办法
3万
查看次数

Spark中的HashingTF和CountVectorizer有什么区别?

试图在Spark中进行doc分类.我不确定HashingTF中的散列是做什么的; 它会牺牲任何准确性吗?我对此表示怀疑,但我不知道.spark文档说它使用了"散列技巧"......只是工程师使用的另一个非常糟糕/混乱命名的例子(我也很内疚).CountVectorizer还需要设置词汇量大小,但它有另一个参数,一个阈值参数,可用于排除出现在文本语料库中某个阈值以下的单词或标记.我不明白这两个变形金刚之间的区别.使这一点很重要的是算法中的后续步骤.例如,如果我想在生成的tfidf矩阵上执行SVD,那么词汇量大小将决定SVD矩阵的大小,这会影响代码的运行时间,以及模型性能等.我一般有困难在API文档之外找到关于Spark Mllib的任何来源以及没有深度的非常简单的例子.

apache-spark apache-spark-ml apache-spark-mllib

22
推荐指数
3
解决办法
1万
查看次数

如何使用JAVA在Spark DataFrame上调用UDF?

类似的问题在这里,但没有足够的点来评论那里.

根据最新的Spark 文档,udf可以使用两种不同的方式,一种使用SQL,另一种使用DataFrame.我找到了多个如何使用udfwith sql的例子,但是却找不到任何关于如何udf直接在DataFrame上使用的例子.

由运上上面链接的问题提供解决方案使用__callUDF()__其是_deprecated_根据所述火花Java API文档将在火花2.0被移除.在那里,它说:

"因为它与udf()是多余的"

所以这意味着我应该可以__udf()__用来训练我的udf,但我无法弄清楚如何做到这一点.我没有偶然发现任何说明Java-Spark程序语法的内容.我错过了什么?

import org.apache.spark.sql.api.java.UDF1;
.
.    
UDF1 mode = new UDF1<String[], String>() {
    public String call(final String[] types) throws Exception {
        return types[0];
    }
};

sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?
Run Code Online (Sandbox Code Playgroud)

java user-defined-functions apache-spark apache-spark-sql

14
推荐指数
1
解决办法
2万
查看次数

为什么Spark Mllib KMeans算法非常慢?

我遇到了和这篇文章一样的问题,但我没有足够的积分在那里添加评论.我的数据集有1百万行,100列.我也使用Mllib KMeans而且它非常慢.这项工作实际上从未结束,我必须杀死它.我在谷歌云(dataproc)上运行它.如果我要求较少数量的群集(k = 1000),它仍会运行,但仍需要超过35分钟.我需要它运行k~5000.我不知道为什么这么慢.考虑到工人/节点的数量,数据被正确分区,并且在100万x~300,000 col矩阵上的SVD需要大约3分钟,但是当涉及KMeans时,它只是进入黑洞.我现在尝试较少的迭代次数(2次而不是100次),但我觉得某处出了问题.

KMeansModel Cs = KMeans.train(datamatrix, k, 100);//100 iteration, changed to 2 now. # of clusters k=1000 or 5000
Run Code Online (Sandbox Code Playgroud)

cluster-analysis data-mining k-means apache-spark apache-spark-mllib

6
推荐指数
1
解决办法
2405
查看次数

安装tensorflow 1.3后是否需要分别安装keras 2.0?

我刚刚将我的tf从1.0升级到tf 1.3(pip install --upgrade tensorflow).我知道自从版本1.2以来keras 2.0成为tensorflow的一部分.但是,当我导入keras并检查其版本时,它仍显示1.2.我应该升级keras吗?如果是这样,那么" Keras API现在可以作为TensorFlow的一部分直接使用,从TensorFlow 1.2开始 "是什么意思?

keras tensorflow

6
推荐指数
1
解决办法
1418
查看次数

如何在GroupBy操作后从spark DataFrame列中收集字符串列表?

这里描述的解决方案(通过zero323)非常接近我想要的两个曲折:

  1. 我如何用Java做到这一点?
  2. 如果列具有字符串列表而不是单个字符串,并且我想在GroupBy(其他列)之后将所有这些列表收集到单个列表中,该怎么办?

我正在使用Spark 1.6并尝试使用

org.apache.spark.sql.functions.collect_list(Column col) 如该问题的解决方案中所述,但得到以下错误

线程"main"中的异常org.apache.spark.sql.AnalysisException:undefined function collect_list; at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry $$ anonfun $ 2.apply(FunctionRegistry.scala:65)at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry $$ anonfun $ 2.apply(FunctionRegistry. scala:65)在scala.Option.getOrElse(Option.scala:121)

java apache-spark apache-spark-sql

5
推荐指数
1
解决办法
9910
查看次数

如何从 Spark ML OneVsRest 分类器获取原始概率?

我想使用一个与其余(全部)来解决多类分类问题。我在 Spark ML 中发现了 OneVsRest,它将多类问题分解为几个二元分类。我希望有一种方法可以获得原始概率,但我似乎找不到。在 mllib 中,我可以使用clearThreshold() 使二元分类器(如逻辑回归)返回原始概率,但我在 ML 逻辑回归中看不到该方法。我确实看到 rawPredictionCol() 返回一个参数,但没有关于如何使用它的文档。我正在使用 Java Spark API,并尝试让这个示例产生原始概率。问题 2:出于某种原因,我通常发现 Spark 文档在很多情况下都缺乏..我缺少什么?

更新:尝试阅读源代码..这不是一个很有成效的练习;我不是 Scala 程序员。将更新调查结果。

java apache-spark apache-spark-ml apache-spark-mllib

5
推荐指数
0
解决办法
470
查看次数

将 Spark 数据帧写入 JSON 会丢失 MLLIB 稀疏向量的格式

我正在将(Java)Spark Dataframe 写入 json。其中一列是 mllib 稀疏向量。后来我将 json 文件读入第二个数据帧,但稀疏向量列现在是 WrappedArray 并且在第二个数据帧中没有作为稀疏向量读取。我的问题:有什么我可以在写入端或读取端做的,以获得一个稀疏向量列而不是一个wrappedArray列?

写作:

initialDF.coalesce(1).write().json("initial_dataframe");
Run Code Online (Sandbox Code Playgroud)

读:

DataFrame secondDF = hiveContext.read().json("initial_dataframe");
Run Code Online (Sandbox Code Playgroud)

java apache-spark apache-spark-sql apache-spark-mllib

3
推荐指数
1
解决办法
1475
查看次数

有没有办法可视化Spark mllib随机森林模型?

我似乎找不到一种方法来可视化使用Spark的MLLib RandomForestModel获得的RF模型。该模型以字符串形式打印,只是一堆嵌套的IF语句..看起来像在R语言中一样可能看起来很自然。我使用的是Spark Python API和Java API。我的RF模型的R形可视化。

apache-spark apache-spark-mllib

2
推荐指数
1
解决办法
2573
查看次数

如何将持久坐标转换应用于Matplotlib补丁?

我对matplotlib坐标系有点困惑(在Jupyter笔记本中使用它进行交互式动画).如果你创建一个补丁,比如一个圆圈,然后使用set_transform()进行翻译,我发现转换不是持久的,这意味着如果再次使用相同的(x,y)移位应用转换,则圆圈不会移动是因为看起来圆形位置在变换后没有得到更新,因此后续变换将应用于相同的原始色块位置.我的问题是如何应用转换,实际上不仅在应用补丁后移动补丁,还会更新补丁位置?假设我想应用一系列翻译,那么圆圈应该四处移动而不是从它的原始位置来回摆动.这是一个示例代码:

%import matplotlib
%matplotlib notebook

fig = plt.figure()
ax = fig.add_subplot(111, xlim=(-10, 10), ylim=(-10, 10))
c = ax.add_patch(plt.Circle((x, y), radius=0.5)
c.set_transform(ax.transData + mpl.transforms.Affine2D().translate(-5,-5))
c.set_transform(ax.transData + mpl.transforms.Affine2D().translate(10,10))
Run Code Online (Sandbox Code Playgroud)

如果你运行它,你会看到注释掉第一个翻译不会影响圆圈的最终位置.我原以为圆心的最终位置是(5,5)NOT(10,10).这意味着转换实际上不会更新补丁(圆圈)位置; 它只是将其翻译成图形/轴.

问题2:我发现另一件令人困惑的事情是,上面代码生成的圆圈确实看起来半径为0.5,但是在应用了(10,10)的翻译后,它在图中的翻译要少得多!! 好像翻译班次在应用之前按比例缩小了!我没有解释这个,它只是表明我不理解matplotlib坐标系和转换.

另一方面,plot()生成的对象(Line2D对象)可以通过set_data()方法进行转换,该方法更新对象的位置如下(假设图中的fig和ax对象):

L, = ax.plot(0, 0, 'ro', ms=8)
sx = 10 # shift in x
sy = 10 # shift in y
L.set_data(L.get_data()[0] + sx, L.get_data()[1] + sy)
Run Code Online (Sandbox Code Playgroud)

我不确定如何为matplotlib补丁做同样的事情?

python matplotlib

2
推荐指数
1
解决办法
1410
查看次数