如何查询具有复杂类型(如地图/数组)的RDD?例如,当我写这个测试代码时:
case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
Run Code Online (Sandbox Code Playgroud)
我虽然语法如下:
sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")
Run Code Online (Sandbox Code Playgroud)
要么
sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")
Run Code Online (Sandbox Code Playgroud)
但我明白了
无法访问MapType类型中的嵌套字段(StringType,StringType,true)
和
org.apache.spark.sql.catalyst.errors.package $ TreeNodeException:未解析的属性
分别.
请考虑以下代码段(假设spark已设置为某些代码段SparkSession):
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
Run Code Online (Sandbox Code Playgroud)
请注意,temperature字段是浮动列表.我想将这些浮点数列表转换为MLlib类型Vector,我希望使用基本DataFrameAPI 表示这种转换,而不是通过RDD表达(这是低效的,因为它将所有数据从JVM发送到Python,处理在Python中完成,我们没有得到Spark的Catalyst优化器,yada yada的好处.我该怎么做呢?特别:
这就是我期望的"正确"解决方案.我想将列的类型从一种类型转换为另一种类型,所以我应该使用强制转换.作为一个上下文,让我提醒您将其转换为另一种类型的正常方法:
from pyspark.sql import types
df_with_strings = df.select(
df["city"],
df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
Run Code Online (Sandbox Code Playgroud)
现在例如df_with_strings.collect()[0]["temperatures"][1]是'-7.0'.但是如果我施放到ml Vector那么事情就不那么顺利了:
from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
Run Code Online (Sandbox Code Playgroud)
这给出了一个错误:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type …Run Code Online (Sandbox Code Playgroud) python apache-spark apache-spark-sql pyspark apache-spark-ml
spark.sql.shuffle.partitions和之间有什么区别spark.default.parallelism?
我试图将它们都设置为SparkSQL,但第二阶段的任务编号始终为200.
假设我有一个相当大的数据集,形式如下:
data = sc.parallelize([('Foo',41,'US',3),
('Foo',39,'UK',1),
('Bar',57,'CA',2),
('Bar',72,'CA',2),
('Baz',22,'US',6),
('Baz',36,'US',6)])
Run Code Online (Sandbox Code Playgroud)
我想要做的是仅根据第一,第三和第四列的值删除重复的行.
删除完全重复的行很简单:
data = data.distinct()
Run Code Online (Sandbox Code Playgroud)
第5行或第6行将被删除
但是,我如何仅删除基于第1,3和4列的重复行?即删除以下任何一个:
('Baz',22,'US',6)
('Baz',36,'US',6)
Run Code Online (Sandbox Code Playgroud)
在Python中,这可以通过使用指定列来完成.drop_duplicates().我怎样才能在Spark/Pyspark中实现同样的目标?
我开始使用Spark DataFrames,我需要能够透过数据来创建多列的1列中的多列.在Scalding中有内置的功能,我相信Python中的Pandas,但我找不到任何新的Spark Dataframe.
我假设我可以编写某种类型的自定义函数,但是我甚至不确定如何启动,特别是因为我是Spark的新手.我有人知道如何使用内置功能或如何在Scala中编写内容的建议,非常感谢.
我想覆盖特定的分区而不是所有的火花.我正在尝试以下命令:
df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')
Run Code Online (Sandbox Code Playgroud)
其中df是具有要覆盖的增量数据的数据帧.
hdfs-base-path包含主数据.
当我尝试上面的命令时,它会删除所有分区,并在hdfs路径中插入df中存在的分区.
我的要求是只覆盖指定hdfs路径中df中存在的那些分区.有人可以帮我吗?
我正在试图找出在Spark数据帧列中获得最大值的最佳方法.
请考虑以下示例:
df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
Run Code Online (Sandbox Code Playgroud)
这创造了:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
Run Code Online (Sandbox Code Playgroud)
我的目标是找到A列中的最大值(通过检查,这是3.0).使用PySpark,我可以想到以下四种方法:
# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").first().asDict()['A'])
# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").first().asDict()['maxval']
# Method 3: Use groupby()
df.groupby().max('A').first().asDict()['max(A)']
# Method 4: Convert to RDD
df.select("A").rdd.max()[0]
Run Code Online (Sandbox Code Playgroud)
上面的每一个都给出了正确的答案,但在没有Spark分析工具的情况下,我无法分辨哪个是最好的.
任何关于上述哪种方法在Spark运行时或资源使用方面最有效的直觉或经验主义的想法,或者是否有比上述方法更直接的方法?
我有一个数据框,有一行和几列.一些列是单个值,其他列是列表.所有列表列的长度都相同.我想将每个列表列拆分为一个单独的行,同时保持任何非列表列不变.
样本DF:
from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode
sqlc = SQLContext(sc)
df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# | a| b| c| d|
# +---+---------+---------+---+
# | 1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+
Run Code Online (Sandbox Code Playgroud)
我想要的是:
+---+---+----+------+
| a| b| c | d |
+---+---+----+------+
| 1| 1| 7 | foo |
| 1| 2| 8 | foo |
| 1| 3| 9 | foo |
+---+---+----+------+
Run Code Online (Sandbox Code Playgroud)
如果我只有一个列表列,只需执行以下操作即可explode:
df_exploded = df.withColumn('b', explode('b')) …Run Code Online (Sandbox Code Playgroud) 有没有人可以通过示例帮助我理解DataSet API和DataFrame API之间的区别?为什么需要在Spark中引入DataSet API?
我使用以下模式在spark中创建了一个数据框:
root
|-- user_id: long (nullable = false)
|-- event_id: long (nullable = false)
|-- invited: integer (nullable = false)
|-- day_diff: long (nullable = true)
|-- interested: integer (nullable = false)
|-- event_owner: long (nullable = false)
|-- friend_id: long (nullable = false)
Run Code Online (Sandbox Code Playgroud)
数据如下所示:
+----------+----------+-------+--------+----------+-----------+---------+
| user_id| event_id|invited|day_diff|interested|event_owner|friend_id|
+----------+----------+-------+--------+----------+-----------+---------+
| 4236494| 110357109| 0| -1| 0| 937597069| null|
| 78065188| 498404626| 0| 0| 0| 2904922087| null|
| 282487230|2520855981| 0| 28| 0| 3749735525| null|
| 335269852|1641491432| 0| 2| 0| 1490350911| null| …Run Code Online (Sandbox Code Playgroud) apache-spark ×10
apache-spark-sql ×10
pyspark ×4
dataframe ×3
python ×3
scala ×3
bigdata ×1
hadoop ×1
performance ×1
pivot ×1
rdd ×1
sql ×1