我有两个数据帧df1和df2.它们都有以下架构:
|-- ts: long (nullable = true)
|-- id: integer (nullable = true)
|-- managers: array (nullable = true)
| |-- element: string (containsNull = true)
|-- projects: array (nullable = true)
| |-- element: string (containsNull = true)
Run Code Online (Sandbox Code Playgroud)
df1是从avro文件创建而df2来自等效的镶木地板文件.但是,如果我执行,df1.unionAll(df2).show()我收到以下错误:
org.apache.spark.sql.AnalysisException: unresolved operator 'Union;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:37)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:174)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:49)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:103)
Run Code Online (Sandbox Code Playgroud) 我有一个Parameters类型map的列:
>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> d = [{'Parameters': {'foo': '1', 'bar': '2', 'baz': 'aaa'}}]
>>> df = sqlContext.createDataFrame(d)
>>> df.collect()
[Row(Parameters={'foo': '1', 'bar': '2', 'baz': 'aaa'})]
Run Code Online (Sandbox Code Playgroud)
我想重塑它在pyspark这样所有的按键(foo,bar,等)都列,分别为:
[Row(foo='1', bar='2', baz='aaa')]
Run Code Online (Sandbox Code Playgroud)
使用withColumn作品:
(df
.withColumn('foo', df.Parameters['foo'])
.withColumn('bar', df.Parameters['bar'])
.withColumn('baz', df.Parameters['baz'])
.drop('Parameters')
).collect()
Run Code Online (Sandbox Code Playgroud)
但我需要一个没有明确提到列名的解决方案,因为我有几十个.
>>> df.printSchema()
root
|-- Parameters: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull …Run Code Online (Sandbox Code Playgroud) 我需要根据现有列爬行新的Spark DF MapType列,其中列名是键,值是值.
作为例子 - 我有这个DF:
rdd = sc.parallelize([('123k', 1.3, 6.3, 7.6),
('d23d', 1.5, 2.0, 2.2),
('as3d', 2.2, 4.3, 9.0)
])
schema = StructType([StructField('key', StringType(), True),
StructField('metric1', FloatType(), True),
StructField('metric2', FloatType(), True),
StructField('metric3', FloatType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
+----+-------+-------+-------+
| key|metric1|metric2|metric3|
+----+-------+-------+-------+
|123k| 1.3| 6.3| 7.6|
|d23d| 1.5| 2.0| 2.2|
|as3d| 2.2| 4.3| 9.0|
+----+-------+-------+-------+
Run Code Online (Sandbox Code Playgroud)
我已经到目前为止,我可以从这里创建一个structType:
nameCol = struct([name for name in df.columns if ("metric" in name)]).alias("metric")
df2 = df.select("key", nameCol)
+----+-------------+
| key| metric|
+----+-------------+
|123k|[1.3,6.3,7.6]|
|d23d|[1.5,2.0,2.2]|
|as3d|[2.2,4.3,9.0]| …Run Code Online (Sandbox Code Playgroud)