标签: pyspark

PySpark:OneHotEncoder 的输出看起来很奇怪

星火文档包含一个PySpark例如OneHotEncoder

from pyspark.ml.feature import OneHotEncoder, StringIndexer

df = spark.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)

encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()
Run Code Online (Sandbox Code Playgroud)

我希望该列categoryVec看起来像这样:

[0.0, 0.0]
[1.0, 0.0]
[0.0, 1.0]
[0.0, 0.0]
[0.0, 0.0]
[0.0, 1.0]
Run Code Online (Sandbox Code Playgroud)

categoryVec实际上看起来是这样的:

(2, [0], [1.0])
    (2, [], [])
(2, [1], [1.0])
(2, [0], [1.0])
(2, [0], [1.0])
(2, …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark apache-spark-mllib one-hot-encoding

0
推荐指数
1
解决办法
491
查看次数

基于其他列pyspark删除重复记录

我有一个data framepyspark像下面。

df.show()
+---+----+
| id|test|
+---+----+
|  1|   Y|
|  1|   N|
|  2|   Y|
|  3|   N|
+---+----+
Run Code Online (Sandbox Code Playgroud)

我想在有重复记录时删除记录id并且testN

现在当我查询 new_df

new_df.show()
+---+----+
| id|test|
+---+----+
|  1|   Y|
|  2|   Y|
|  3|   N|
+---+----+
Run Code Online (Sandbox Code Playgroud)

我无法弄清楚用例。

我已经完成了 groupbyid计数,但它只给出了id列和count.

我做了如下。

grouped_df = new_df.groupBy("id").count()
Run Code Online (Sandbox Code Playgroud)

我怎样才能达到我想要的结果

编辑

我有一个如下所示的数据框。

+-------------+--------------------+--------------------+
|           sn|              device|           attribute|
+-------------+--------------------+--------------------+
|4MY16A5602E0A|       Android Phone|                   N|
|4MY16A5W02DE8|       Android Phone|                   N|
|4MY16A5W02DE8|       Android Phone| …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

0
推荐指数
1
解决办法
1003
查看次数

如何在转换过程中测试数据类型转换

我们有一个将数据映射到数据帧的脚本(我们使用的是 pyspark)。数据以字符串形式出现,并且对其进行了一些其他有时昂贵的操作,但作为操作的一部分(调用 withColumn),我们对其最终数据类型进行了强制转换。

我需要判断是否发生了截断,但如果发生了截断,我们不想失败。我们只想要一个数字来知道每个翻译列(大约有 300 列)中有多少行失败。

我的第一个想法是让每一列通过一个 UDF 来进行测试,输出将是一个包含值的数组,以及一个关于它是否通过数据类型检查的值。然后我会做2个选择。一个从数组中选择原始值,另一个聚合未命中。但这似乎是一个草率的解决方案。我对 pyspark/hadoop 世界还很陌生……很想知道是否有更好的(也许是标准的?)方法来做到这一点。

hadoop apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
668
查看次数

如何将小的镶木地板文件合并为一个大的镶木地板文件?

我有一些分区的配置单元表,它们指向镶木地板文件。现在每个分区都有很多小的镶木地板文件,每个大小约为 5kb,我想将这些小文件合并为每个分区的一个大文件。我怎样才能做到这一点来提高我的蜂巢性能?我尝试将分区中的所有镶木地板文件读取到 pyspark 数据帧,并将组合数据帧重写到同一分区并删除旧的。但出于某种原因,这对我来说似乎效率低下或初学者级别的类型。这样做的利弊是什么?而且,如果有任何其他方法,请指导我在 spark 或 pyspark 中实现它。

hive apache-spark parquet pyspark

0
推荐指数
1
解决办法
9818
查看次数

在pyspark 2.3中,如何处理json模式推断后不区分大小写导致的列名不明确?

在 Pyspark 2.3 中,假设我有一个如下所示的 JSON 文档:

{
   "key1": {
       "key2": "abc",
       "KEY2": "def"
    }
}
Run Code Online (Sandbox Code Playgroud)

实际上,我有数十亿个这样的文档,每个文档都有可能有数百(甚至数千)个周期性变化的深度嵌套结构。但是这个简单的文档说明了这个问题。

如果我做:

df = session.read.json(<file>)
df.select('key1.key2')
df.select('key1.KEY2')
Run Code Online (Sandbox Code Playgroud)

两个选择都将失败并显示如下错误:

pyspark.sql.utils.AnalysisException: 'Ambiguous reference to fields StructField(key2,StringType,true), StructField(KEY2,StringType,true);'
Run Code Online (Sandbox Code Playgroud)

由于模式的广度及其不断变化的性质,通过 StructType 结构对模式进行硬编码是不切实际的。

我该如何处理这种情况?理想情况下,我有一种方法可以重命名重复的列,这样它们就不会发生冲突(例如,'key2_0'、'KEY2_1' 等)。不幸的是,我找不到任何方法来迭代列列表或更改列名,而无需首先能够通过名称明确引用列。

apache-spark pyspark pyspark-sql

0
推荐指数
1
解决办法
1483
查看次数

Pyspark 错误:使用交叉验证时“字段 rawPrediction 不存在”

我一直在尝试使用CrossValidator我的训练数据,但我总是收到错误消息:

"An error occurred while calling o80267.evaluate.
: java.lang.IllegalArgumentException: Field "rawPrediction" does not exist.
Available fields: label, features, CrossValidator_6a7bb791f63f_rand, features_scaled, prediction"
Run Code Online (Sandbox Code Playgroud)

这是代码:

df = spark.createDataFrame(input_data, ["label", "features"])

train_data, test_data = df.randomSplit([.8,.2],seed=1234)
train_data.show()

standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
lr = LinearRegression(maxIter=10)

pipeline = Pipeline(stages=[standardScaler, lr])

paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.3, 0.1, 0.01])\
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 0.8, 1.0])\
    .build()


crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)


cvModel = crossval.fit(train_data)
Run Code Online (Sandbox Code Playgroud)

使用train_data.show()(在第三行)时,输出如下:

    +-----+--------------------+
    |label|            features|
    +-----+--------------------+
    |4.526|[129.0,322.0,126....|
    |3.585|[1106.0,2401.0,11...|
    |3.521|[190.0,496.0,177....| …
Run Code Online (Sandbox Code Playgroud)

python machine-learning apache-spark pyspark apache-spark-ml

0
推荐指数
1
解决办法
1635
查看次数

AttributeError: 'RDD' 对象没有属性 'show'

from pyspark import SparkContext, SparkConf, sql
from pyspark.sql import Row
sc = SparkContext.getOrCreate()
sqlContext = sql.SQLContext(sc)
df = sc.parallelize([ \
                 Row(nama='Roni', umur=27, tingi=168), \
                 Row(nama='Roni', umur=6, tingi=168),
                 Row(nama='Roni', umur=89, tingi=168),])

df.show()
Run Code Online (Sandbox Code Playgroud)

错误:回溯(最近一次调用最后一次):

文件“ipython-input-24-bfb18ebba99e”,第 8 行,在 df.show()

AttributeError: 'RDD' 对象没有属性 'show'

python apache-spark pyspark

0
推荐指数
1
解决办法
5671
查看次数

使用别名透视和聚合 PySpark 数据帧

我有一个与此类似的 PySpark DataFrame:

df = sc.parallelize([
    ("c1", "A", 3.4, 0.4, 3.5), 
    ("c1", "B", 9.6, 0.0, 0.0),
    ("c1", "A", 2.8, 0.4, 0.3),
    ("c1", "B", 5.4, 0.2, 0.11),
    ("c2", "A", 0.0, 9.7, 0.3), 
    ("c2", "B", 9.6, 8.6, 0.1),
    ("c2", "A", 7.3, 9.1, 7.0),
    ("c2", "B", 0.7, 6.4, 4.3)
]).toDF(["user_id", "type", "d1", 'd2', 'd3'])
df.show()
Run Code Online (Sandbox Code Playgroud)

这使:

+-------+----+---+---+----+
|user_id|type| d1| d2|  d3|
+-------+----+---+---+----+
|     c1|   A|3.4|0.4| 3.5|
|     c1|   B|9.6|0.0| 0.0|
|     c1|   A|2.8|0.4| 0.3|
|     c1|   B|5.4|0.2|0.11|
|     c2|   A|0.0|9.7| 0.3|
|     c2| …
Run Code Online (Sandbox Code Playgroud)

alias pivot aggregate-functions apache-spark-sql pyspark

0
推荐指数
1
解决办法
3422
查看次数

将数据帧转换为pyspark中嵌套的json对象数组

我创建了如下数据框:

+----+-------+-------+
| age| number|name   |
+----+-------+-------+
|  16|     12|A      |
|  16|     13|B      |
|  17|     16|E      |
|  17|     17|F      |
+----+-------+-------+
Run Code Online (Sandbox Code Playgroud)

如何将其转换为以下json:

{ 
'age' : 16,  
'values' : [{‘number’: ‘12’ , ‘name’ : 'A'},{‘number’: ‘12’ , ‘name’ : 'A'} ] 
},{ 
'age' : 17,  
'values' : [{‘number’: ‘16’ , ‘name’ : 'E'},{‘number’: ‘17’ , ‘name’ : 'F'} ] 
}
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

0
推荐指数
1
解决办法
3237
查看次数

如何将火花数据框中的两列相乘

假设我有一个名为“orderitems”的数据框,其架构如下

    DataFrame[order_item_id: int, order_item_order_id: int, order_item_product_id: int, order_item_quantity: int, order_item_subtotal: float, order_item_product_price: float]
Run Code Online (Sandbox Code Playgroud)

因此,作为检查数据质量的一部分,我需要确保所有行都满足以下公式: order_item_subtotal = (order_item_quantity*order_item_product_price)。为此,我需要添加一个名为“valid”的单独列,对于满足上述公式的所有行,它应该将“Y”作为值,而对于所有其他行,它应该将“N”作为值。我决定使用 when() 和 else() 以及 withColumn() 方法,如下所示。

    orderitems.withColumn("valid",when(orderitems.order_item_subtotal != (orderitems.order_item_product_price * orderitems.order_item_quantity),'N').otherwise("Y"))
Run Code Online (Sandbox Code Playgroud)

但它返回给我以下错误:

    TypeError: 'Column' object is not callable
Run Code Online (Sandbox Code Playgroud)

我知道这是因为我试图将两个列对象相乘。但我不确定如何解决这个问题,因为我仍然在 spark 学习过程中。我想知道,如何解决这个问题。我在 Python 中使用 Spark 2.3.0

apache-spark pyspark

0
推荐指数
1
解决办法
1万
查看次数