标签: apache-spark-sql

查询具有复杂类型的Spark SQL DataFrame

如何查询具有复杂类型(如地图/数组)的RDD?例如,当我写这个测试代码时:

case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
Run Code Online (Sandbox Code Playgroud)

我虽然语法如下:

sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")
Run Code Online (Sandbox Code Playgroud)

要么

sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")
Run Code Online (Sandbox Code Playgroud)

但我明白了

无法访问MapType类型中的嵌套字段(StringType,StringType,true)

org.apache.spark.sql.catalyst.errors.package $ TreeNodeException:未解析的属性

分别.

sql scala dataframe apache-spark apache-spark-sql

54
推荐指数
1
解决办法
5万
查看次数

如何将数组(即列表)列转换为Vector

问题的简短版本!

请考虑以下代码段(假设spark已设置为某些代码段SparkSession):

from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)
Run Code Online (Sandbox Code Playgroud)

请注意,temperature字段是浮动列表.我想将这些浮点数列表转换为MLlib类型Vector,我希望使用基本DataFrameAPI 表示这种转换,而不是通过RDD表达(这是低效的,因为它将所有数据从JVM发送到Python,处理在Python中完成,我们没有得到Spark的Catalyst优化器,yada yada的好处.我该怎么做呢?特别:

  1. 有没有办法让直接演员工作?请参阅下面的详细信息(以及尝试解决方法失败)?或者,是否有其他操作具有我之后的效果?
  2. 从我在下面建议的两种替代解决方案(UDF vs爆炸/重新组合列表中的项目)中哪种更有效?或者是否有其他几乎但不是非常正确的替代品比其中任何一种更好?

直接投射不起作用

这就是我期望的"正确"解决方案.我想将列的类型从一种类型转换为另一种类型,所以我应该使用强制转换.作为一个上下文,让我提醒您将其转换为另一种类型的正常方法:

from pyspark.sql import types
df_with_strings = df.select(
    df["city"], 
    df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
Run Code Online (Sandbox Code Playgroud)

现在例如df_with_strings.collect()[0]["temperatures"][1]'-7.0'.但是如果我施放到ml Vector那么事情就不那么顺利了:

from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
Run Code Online (Sandbox Code Playgroud)

这给出了一个错误:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type …
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark apache-spark-ml

54
推荐指数
3
解决办法
2万
查看次数

spark.sql.shuffle.partitions和spark.default.parallelism有什么区别?

spark.sql.shuffle.partitions和之间有什么区别spark.default.parallelism

我试图将它们都设置为SparkSQL,但第二阶段的任务编号始终为200.

performance hadoop bigdata apache-spark apache-spark-sql

54
推荐指数
3
解决办法
6万
查看次数

根据RDD/Spark DataFrame中的特定列从行中删除重复项

假设我有一个相当大的数据集,形式如下:

data = sc.parallelize([('Foo',41,'US',3),
                       ('Foo',39,'UK',1),
                       ('Bar',57,'CA',2),
                       ('Bar',72,'CA',2),
                       ('Baz',22,'US',6),
                       ('Baz',36,'US',6)])
Run Code Online (Sandbox Code Playgroud)

我想要做的是仅根据第一,第三和第四列的值删除重复的行.

删除完全重复的行很简单:

data = data.distinct()
Run Code Online (Sandbox Code Playgroud)

第5行或第6行将被删除

但是,我如何仅删除基于第1,3和4列的重复行?即删除以下任何一个:

('Baz',22,'US',6)
('Baz',36,'US',6)
Run Code Online (Sandbox Code Playgroud)

在Python中,这可以通过使用指定列来完成.drop_duplicates().我怎样才能在Spark/Pyspark中实现同样的目标?

apache-spark apache-spark-sql pyspark

53
推荐指数
6
解决办法
9万
查看次数

如何转动DataFrame?

我开始使用Spark DataFrames,我需要能够透过数据来创建多列的1列中的多列.在Scalding中有内置的功能,我相信Python中的Pandas,但我找不到任何新的Spark Dataframe.

我假设我可以编写某种类型的自定义函数,但是我甚至不确定如何启动,特别是因为我是Spark的新手.我有人知道如何使用内置功能或如何在Scala中编写内容的建议,非常感谢.

pivot scala dataframe apache-spark apache-spark-sql

52
推荐指数
5
解决办法
4万
查看次数

在spark数据帧写入方法中覆盖特定分区

我想覆盖特定的分区而不是所有的火花.我正在尝试以下命令:

df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')
Run Code Online (Sandbox Code Playgroud)

其中df是具有要覆盖的增量数据的数据帧.

hdfs-base-path包含主数据.

当我尝试上面的命令时,它会删除所有分区,并在hdfs路径中插入df中存在的分区.

我的要求是只覆盖指定hdfs路径中df中存在的那些分区.有人可以帮我吗?

apache-spark apache-spark-sql spark-dataframe

52
推荐指数
6
解决办法
5万
查看次数

获取Spark数据帧列中最大值的最佳方法

我正在试图找出在Spark数据帧列中获得最大值的最佳方法.

请考虑以下示例:

df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
Run Code Online (Sandbox Code Playgroud)

这创造了:

+---+---+
|  A|  B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
Run Code Online (Sandbox Code Playgroud)

我的目标是找到A列中的最大值(通过检查,这是3.0).使用PySpark,我可以想到以下四种方法:

# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").first().asDict()['A'])

# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").first().asDict()['maxval']

# Method 3: Use groupby()
df.groupby().max('A').first().asDict()['max(A)']

# Method 4: Convert to RDD
df.select("A").rdd.max()[0]
Run Code Online (Sandbox Code Playgroud)

上面的每一个都给出了正确的答案,但在没有Spark分析工具的情况下,我无法分辨哪个是最好的.

任何关于上述哪种方法在Spark运行时或资源使用方面最有效的直觉或经验主义的想法,或者是否有比上述方法更直接的方法?

python apache-spark apache-spark-sql pyspark

51
推荐指数
8
解决办法
9万
查看次数

Pyspark:将多个数组列拆分为行

我有一个数据框,有一行和几列.一些列是单个值,其他列是列表.所有列表列的长度都相同.我想将每个列表列拆分为一个单独的行,同时保持任何非列表列不变.

样本DF:

from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode

sqlc = SQLContext(sc)

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# |  a|        b|        c|  d|
# +---+---------+---------+---+
# |  1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+
Run Code Online (Sandbox Code Playgroud)

我想要的是:

+---+---+----+------+
|  a|  b|  c |    d |
+---+---+----+------+
|  1|  1|  7 |  foo |
|  1|  2|  8 |  foo |
|  1|  3|  9 |  foo |
+---+---+----+------+
Run Code Online (Sandbox Code Playgroud)

如果我只有一个列表列,只需执行以下操作即可explode:

df_exploded = df.withColumn('b', explode('b')) …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

50
推荐指数
3
解决办法
3万
查看次数

DataSet API和DataFrame API之间的区别

有没有人可以通过示例帮助我理解DataSet API和DataFrame API之间的区别?为什么需要在Spark中引入DataSet API?

apache-spark rdd apache-spark-sql apache-spark-dataset

49
推荐指数
0
解决办法
3万
查看次数

如何从spark数据帧中过滤掉null值

我使用以下模式在spark中创建了一个数据框:

root
 |-- user_id: long (nullable = false)
 |-- event_id: long (nullable = false)
 |-- invited: integer (nullable = false)
 |-- day_diff: long (nullable = true)
 |-- interested: integer (nullable = false)
 |-- event_owner: long (nullable = false)
 |-- friend_id: long (nullable = false)
Run Code Online (Sandbox Code Playgroud)

数据如下所示:

+----------+----------+-------+--------+----------+-----------+---------+
|   user_id|  event_id|invited|day_diff|interested|event_owner|friend_id|
+----------+----------+-------+--------+----------+-----------+---------+
|   4236494| 110357109|      0|      -1|         0|  937597069|     null|
|  78065188| 498404626|      0|       0|         0| 2904922087|     null|
| 282487230|2520855981|      0|      28|         0| 3749735525|     null|
| 335269852|1641491432|      0|       2|         0| 1490350911|     null| …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe

49
推荐指数
6
解决办法
12万
查看次数