星火文档包含一个PySpark例如其OneHotEncoder:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
df = spark.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()
Run Code Online (Sandbox Code Playgroud)
我希望该列categoryVec看起来像这样:
[0.0, 0.0]
[1.0, 0.0]
[0.0, 1.0]
[0.0, 0.0]
[0.0, 0.0]
[0.0, 1.0]
Run Code Online (Sandbox Code Playgroud)
但categoryVec实际上看起来是这样的:
(2, [0], [1.0])
(2, [], [])
(2, [1], [1.0])
(2, [0], [1.0])
(2, [0], [1.0])
(2, …Run Code Online (Sandbox Code Playgroud) 我有一个data frame在pyspark像下面。
df.show()
+---+----+
| id|test|
+---+----+
| 1| Y|
| 1| N|
| 2| Y|
| 3| N|
+---+----+
Run Code Online (Sandbox Code Playgroud)
我想在有重复记录时删除记录id并且test是N
现在当我查询 new_df
new_df.show()
+---+----+
| id|test|
+---+----+
| 1| Y|
| 2| Y|
| 3| N|
+---+----+
Run Code Online (Sandbox Code Playgroud)
我无法弄清楚用例。
我已经完成了 groupbyid计数,但它只给出了id列和count.
我做了如下。
grouped_df = new_df.groupBy("id").count()
Run Code Online (Sandbox Code Playgroud)
我怎样才能达到我想要的结果
编辑
我有一个如下所示的数据框。
+-------------+--------------------+--------------------+
| sn| device| attribute|
+-------------+--------------------+--------------------+
|4MY16A5602E0A| Android Phone| N|
|4MY16A5W02DE8| Android Phone| N|
|4MY16A5W02DE8| Android Phone| …Run Code Online (Sandbox Code Playgroud) 我们有一个将数据映射到数据帧的脚本(我们使用的是 pyspark)。数据以字符串形式出现,并且对其进行了一些其他有时昂贵的操作,但作为操作的一部分(调用 withColumn),我们对其最终数据类型进行了强制转换。
我需要判断是否发生了截断,但如果发生了截断,我们不想失败。我们只想要一个数字来知道每个翻译列(大约有 300 列)中有多少行失败。
我的第一个想法是让每一列通过一个 UDF 来进行测试,输出将是一个包含值的数组,以及一个关于它是否通过数据类型检查的值。然后我会做2个选择。一个从数组中选择原始值,另一个聚合未命中。但这似乎是一个草率的解决方案。我对 pyspark/hadoop 世界还很陌生……很想知道是否有更好的(也许是标准的?)方法来做到这一点。
我有一些分区的配置单元表,它们指向镶木地板文件。现在每个分区都有很多小的镶木地板文件,每个大小约为 5kb,我想将这些小文件合并为每个分区的一个大文件。我怎样才能做到这一点来提高我的蜂巢性能?我尝试将分区中的所有镶木地板文件读取到 pyspark 数据帧,并将组合数据帧重写到同一分区并删除旧的。但出于某种原因,这对我来说似乎效率低下或初学者级别的类型。这样做的利弊是什么?而且,如果有任何其他方法,请指导我在 spark 或 pyspark 中实现它。
在 Pyspark 2.3 中,假设我有一个如下所示的 JSON 文档:
{
"key1": {
"key2": "abc",
"KEY2": "def"
}
}
Run Code Online (Sandbox Code Playgroud)
实际上,我有数十亿个这样的文档,每个文档都有可能有数百(甚至数千)个周期性变化的深度嵌套结构。但是这个简单的文档说明了这个问题。
如果我做:
df = session.read.json(<file>)
df.select('key1.key2')
df.select('key1.KEY2')
Run Code Online (Sandbox Code Playgroud)
两个选择都将失败并显示如下错误:
pyspark.sql.utils.AnalysisException: 'Ambiguous reference to fields StructField(key2,StringType,true), StructField(KEY2,StringType,true);'
Run Code Online (Sandbox Code Playgroud)
由于模式的广度及其不断变化的性质,通过 StructType 结构对模式进行硬编码是不切实际的。
我该如何处理这种情况?理想情况下,我有一种方法可以重命名重复的列,这样它们就不会发生冲突(例如,'key2_0'、'KEY2_1' 等)。不幸的是,我找不到任何方法来迭代列列表或更改列名,而无需首先能够通过名称明确引用列。
我一直在尝试使用CrossValidator我的训练数据,但我总是收到错误消息:
"An error occurred while calling o80267.evaluate.
: java.lang.IllegalArgumentException: Field "rawPrediction" does not exist.
Available fields: label, features, CrossValidator_6a7bb791f63f_rand, features_scaled, prediction"
Run Code Online (Sandbox Code Playgroud)
这是代码:
df = spark.createDataFrame(input_data, ["label", "features"])
train_data, test_data = df.randomSplit([.8,.2],seed=1234)
train_data.show()
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
lr = LinearRegression(maxIter=10)
pipeline = Pipeline(stages=[standardScaler, lr])
paramGrid = ParamGridBuilder()\
.addGrid(lr.regParam, [0.3, 0.1, 0.01])\
.addGrid(lr.fitIntercept, [False, True])\
.addGrid(lr.elasticNetParam, [0.0, 0.5, 0.8, 1.0])\
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2)
cvModel = crossval.fit(train_data)
Run Code Online (Sandbox Code Playgroud)
使用train_data.show()(在第三行)时,输出如下:
+-----+--------------------+
|label| features|
+-----+--------------------+
|4.526|[129.0,322.0,126....|
|3.585|[1106.0,2401.0,11...|
|3.521|[190.0,496.0,177....| …Run Code Online (Sandbox Code Playgroud) python machine-learning apache-spark pyspark apache-spark-ml
from pyspark import SparkContext, SparkConf, sql
from pyspark.sql import Row
sc = SparkContext.getOrCreate()
sqlContext = sql.SQLContext(sc)
df = sc.parallelize([ \
Row(nama='Roni', umur=27, tingi=168), \
Row(nama='Roni', umur=6, tingi=168),
Row(nama='Roni', umur=89, tingi=168),])
df.show()
Run Code Online (Sandbox Code Playgroud)
错误:回溯(最近一次调用最后一次):
文件“ipython-input-24-bfb18ebba99e”,第 8 行,在 df.show()
AttributeError: 'RDD' 对象没有属性 'show'
我有一个与此类似的 PySpark DataFrame:
df = sc.parallelize([
("c1", "A", 3.4, 0.4, 3.5),
("c1", "B", 9.6, 0.0, 0.0),
("c1", "A", 2.8, 0.4, 0.3),
("c1", "B", 5.4, 0.2, 0.11),
("c2", "A", 0.0, 9.7, 0.3),
("c2", "B", 9.6, 8.6, 0.1),
("c2", "A", 7.3, 9.1, 7.0),
("c2", "B", 0.7, 6.4, 4.3)
]).toDF(["user_id", "type", "d1", 'd2', 'd3'])
df.show()
Run Code Online (Sandbox Code Playgroud)
这使:
+-------+----+---+---+----+
|user_id|type| d1| d2| d3|
+-------+----+---+---+----+
| c1| A|3.4|0.4| 3.5|
| c1| B|9.6|0.0| 0.0|
| c1| A|2.8|0.4| 0.3|
| c1| B|5.4|0.2|0.11|
| c2| A|0.0|9.7| 0.3|
| c2| …Run Code Online (Sandbox Code Playgroud) 我创建了如下数据框:
+----+-------+-------+
| age| number|name |
+----+-------+-------+
| 16| 12|A |
| 16| 13|B |
| 17| 16|E |
| 17| 17|F |
+----+-------+-------+
Run Code Online (Sandbox Code Playgroud)
如何将其转换为以下json:
{
'age' : 16,
'values' : [{‘number’: ‘12’ , ‘name’ : 'A'},{‘number’: ‘12’ , ‘name’ : 'A'} ]
},{
'age' : 17,
'values' : [{‘number’: ‘16’ , ‘name’ : 'E'},{‘number’: ‘17’ , ‘name’ : 'F'} ]
}
Run Code Online (Sandbox Code Playgroud) 假设我有一个名为“orderitems”的数据框,其架构如下
DataFrame[order_item_id: int, order_item_order_id: int, order_item_product_id: int, order_item_quantity: int, order_item_subtotal: float, order_item_product_price: float]
Run Code Online (Sandbox Code Playgroud)
因此,作为检查数据质量的一部分,我需要确保所有行都满足以下公式: order_item_subtotal = (order_item_quantity*order_item_product_price)。为此,我需要添加一个名为“valid”的单独列,对于满足上述公式的所有行,它应该将“Y”作为值,而对于所有其他行,它应该将“N”作为值。我决定使用 when() 和 else() 以及 withColumn() 方法,如下所示。
orderitems.withColumn("valid",when(orderitems.order_item_subtotal != (orderitems.order_item_product_price * orderitems.order_item_quantity),'N').otherwise("Y"))
Run Code Online (Sandbox Code Playgroud)
但它返回给我以下错误:
TypeError: 'Column' object is not callable
Run Code Online (Sandbox Code Playgroud)
我知道这是因为我试图将两个列对象相乘。但我不确定如何解决这个问题,因为我仍然在 spark 学习过程中。我想知道,如何解决这个问题。我在 Python 中使用 Spark 2.3.0
pyspark ×10
apache-spark ×9
python ×2
alias ×1
hadoop ×1
hive ×1
parquet ×1
pivot ×1
pyspark-sql ×1