相关疑难解决方法(0)

从Spark DataFrame中的单个列派生多个列

我有一个带有巨大可解析元数据的DF作为Dataframe中的单个字符串列,我们称之为DFA,使用ColmnA.

我想打破这一列,将ColmnA分成多个列,通过一个函数,ClassXYZ = Func1(ColmnA).此函数返回一个具有多个变量的类ClassXYZ,现在每个变量都必须映射到新列,例如ColmnA1,ColmnA2等.

如何通过调用此Func1一次,使用这些附加列从一个Dataframe到另一个Data转换,而不必重复它来创建所有列.

如果我每次都要调用这个巨大的函数添加一个新列,它很容易解决,但这是我希望避免的.

请使用工作或伪代码建议.

谢谢

桑杰

scala user-defined-functions dataframe apache-spark apache-spark-sql

48
推荐指数
3
解决办法
5万
查看次数

如何使用Spark DataFrames查询JSON数据列?

我有一个Cassandra表,为简单起见,看起来像:

key: text
jsonData: text
blobData: blob
Run Code Online (Sandbox Code Playgroud)

我可以使用spark和spark-cassandra-connector为此创建一个基本数据框:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()
Run Code Online (Sandbox Code Playgroud)

我正在努力将JSON数据扩展到其底层结构中.我最终希望能够根据json字符串中的属性进行过滤并返回blob数据.像jsonData.foo ="bar"之类的东西并返回blobData.这目前可能吗?

scala dataframe apache-spark apache-spark-sql spark-cassandra-connector

37
推荐指数
2
解决办法
4万
查看次数

StructType/Row的Spark UDF

我在spark Dataframe中有一个"StructType"列,它有一个数组和一个字符串作为子字段.我想修改数组并返回相同类型的新列.我可以用UDF处理它吗?或者有哪些替代方案?

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val sub_schema = StructType(StructField("col1",ArrayType(IntegerType,false),true) :: StructField("col2",StringType,true)::Nil)
val schema = StructType(StructField("subtable", sub_schema,true) :: Nil)
val data = Seq(Row(Row(Array(1,2),"eb")),  Row(Row(Array(3,2,1), "dsf")) )
val rd = sc.parallelize(data)
val df = spark.createDataFrame(rd, schema)
df.printSchema

root
 |-- subtable: struct (nullable = true)
 |    |-- col1: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |    |-- col2: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

看来我需要一个类型为Row的UDF

val u =  udf((x:Row) => x)
       >> Schema for type org.apache.spark.sql.Row is not supported …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark udf

17
推荐指数
3
解决办法
2万
查看次数

PySpark将"map"类型的列转换为数据框中的多个列

输入

我有一个Parameters类型map的列:

>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> d = [{'Parameters': {'foo': '1', 'bar': '2', 'baz': 'aaa'}}]
>>> df = sqlContext.createDataFrame(d)
>>> df.collect()
[Row(Parameters={'foo': '1', 'bar': '2', 'baz': 'aaa'})]
Run Code Online (Sandbox Code Playgroud)

产量

我想重塑它在pyspark这样所有的按键(foo,bar,等)都列,分别为:

[Row(foo='1', bar='2', baz='aaa')]
Run Code Online (Sandbox Code Playgroud)

使用withColumn作品:

(df
 .withColumn('foo', df.Parameters['foo'])
 .withColumn('bar', df.Parameters['bar'])
 .withColumn('baz', df.Parameters['baz'])
 .drop('Parameters')
).collect()
Run Code Online (Sandbox Code Playgroud)

我需要一个没有明确提到列名的解决方案,因为我有几十个.

架构

>>> df.printSchema()

root
 |-- Parameters: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

12
推荐指数
2
解决办法
9339
查看次数

TypeError:列不可迭代-如何遍历ArrayType()?

考虑以下DataFrame:

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[john, sam, jane]      |
|pet   |[whiskers, rover, fido]|
+------+-----------------------+
Run Code Online (Sandbox Code Playgroud)

可以使用以下代码创建:

import pyspark.sql.functions as f
data = [
    ('person', ['john', 'sam', 'jane']),
    ('pet', ['whiskers', 'rover', 'fido'])
]

df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)
Run Code Online (Sandbox Code Playgroud)

有没有一种方法可以通过对每个元素应用函数而不使用?来直接修改ArrayType()列?"names"udf

例如,假设我想将该函数foo应用于"names"列。(我将使用其中的例子foostr.upper只用于说明目的,但我的问题是关于可以应用到一个可迭代的元素任何有效的功能。)

foo = lambda x: x.upper()  # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()
Run Code Online (Sandbox Code Playgroud)

TypeError:列不可迭代

我可以使用udf

foo_udf = f.udf(lambda row: [foo(x) …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark spark-dataframe pyspark-sql

9
推荐指数
1
解决办法
4438
查看次数

如何访问数据框列中的数组元素(标量)

中的第一列dfElements2array。我需要在选择(30002| 30005 | 30158 ...)纬度和经度的同时选择第一个元素而不是数组,而不是整个数组:

数据框应如下所示:

+-----------------------------------+
| short_name  |   lat    |   lng
+-----------------------------------+
|   30002     |37.9796566|-1.1317041|
|   30005     |37.9868856|-1.1371011|
|   30158     | 37.941845|-1.0681918|
|   30006     |37.9971704|-1.0993366|
+-----------------------------------+
Run Code Online (Sandbox Code Playgroud)

您能否告诉我是否有可能编辑命令results.address_components.short_name以访问数组元素?

var DFResults2=DF_Google1.select(explode(DF_Google1 ("results"))).toDF("results")
var dfElements2=DFResults2.select("results.address_components.short_name","results.geometry.location.lat","results.geometry.location.lng")**
var dfElements3=dfElements2.select(explode(dfElements2("short_name"))).toDF("CP")

dfElements2.show()
dfElements2.printSchema()


+--------------------+----------+----------+
|          short_name|       lat|       lng|
+--------------------+----------+----------+
|[30002, Murcia, M...|37.9796566|-1.1317041|
|[30005, Murcia, M...|37.9868856|-1.1371011|
|[30158, Murcia, M...| 37.941845|-1.0681918|
|[30006, Murcia, M...|37.9971704|-1.0993366|
|[30100, Murcia, M...|38.0256612|-1.1640968|
|[30009, Murcia, M...|37.9887492|-1.1496969|
|[30008, Murcia, M...|37.9928939|-1.1317041|
|[30007, Murcia, M...|38.0077579|-1.0993366|
|[Murcia, MU, Regi...|37.9922399|-1.1306544| …
Run Code Online (Sandbox Code Playgroud)

arrays scala dataframe apache-spark

6
推荐指数
2
解决办法
8285
查看次数

过滤数组列内容

我正在使用 pyspark 2.3.1 并且想使用表达式而不是使用 udf 过滤数组元素:

>>> df = spark.createDataFrame([(1, "A", [1,2,3,4]), (2, "B", [1,2,3,4,5])],["col1", "col2", "col3"])
>>> df.show()
+----+----+---------------+
|col1|col2|           col3|
+----+----+---------------+
|   1|   A|   [1, 2, 3, 4]|
|   2|   B|[1, 2, 3, 4, 5]|
+----+----+---------------+
Run Code Online (Sandbox Code Playgroud)

下面显示的表达式是错误的,我想知道如何告诉 spark 从 col3 中的数组中删除任何小于 3 的值。我想要类似的东西:

>>> filtered = df.withColumn("newcol", expr("filter(col3, x -> x >= 3)")).show()
>>> filtered.show()
+----+----+---------+
|col1|col2|   newcol|
+----+----+---------+
|   1|   A|   [3, 4]|
|   2|   B|[3, 4, 5]|
+----+----+---------+
Run Code Online (Sandbox Code Playgroud)

我已经有一个 udf 解决方案,但速度很慢(> 10 亿数据行):

largerThan = F.udf(lambda …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark pyspark-sql

6
推荐指数
1
解决办法
7156
查看次数

UDT的Spark SQL引用属性

我正在尝试实现自定义UDT并能够从Spark SQL引用它(如Spark SQL白皮书的第4.4.2节中所述)。

真正的例子是使用Cap'n Proto或类似方法,使自定义UDT由堆外数据结构提供支持。

对于这篇文章,我做了一个人为的例子。我知道我可以只使用Scala案例类,而不必做任何工作,但这不是我的目标。

例如,我有一个Person包含多个属性,并且希望能够SELECT person.first_name FROM person。我遇到了错误Can't extract value from person#1,但不确定为什么。

这是完整的源代码(也可以从https://github.com/andygrove/spark-sql-udt获取)。

package com.theotherandygrove

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Example {

  def main(arg: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("Example")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    val schema = StructType(List(
      StructField("person_id", DataTypes.IntegerType, true),
      StructField("person", new MockPersonUDT, true)))

    // load initial RDD
    val rdd = sc.parallelize(List(
      MockPersonImpl(1),
      MockPersonImpl(2) …
Run Code Online (Sandbox Code Playgroud)

user-defined-types apache-spark apache-spark-sql

5
推荐指数
1
解决办法
1671
查看次数

从包含嵌套值的 Spark 列中提取值

这是我的 mongodb 集合的架构的一部分:

|-- variables: struct (nullable = true)  
|    |-- actives: struct (nullable = true)  
|    |    |-- data: struct (nullable = true)  
|    |    |    |-- 0: struct (nullable = true)  
|    |    |    |    |--active: integer (nullable = true)  
|    |    |    |    |-- inactive: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我已获取该集合并将其存储在 Spark 数据框中,现在尝试提取变量列中最里面的值。

df_temp = df1.select(df1.variables.actives.data)
Run Code Online (Sandbox Code Playgroud)

这工作得很好,我能够获得数据结构的内部结构。

+----------------------+  
|variables.actives.data|  
+----------------------+  
|  [[1,32,0.516165...|  
|  [[1,30,1.173139...|  
|  [[4,18,0.160088...|
Run Code Online (Sandbox Code Playgroud)

然而,当我尝试进一步深入时:

df_temp = df1.select(df1.variables.actives.data.0.active)
Run Code Online (Sandbox Code Playgroud)

我收到无效语法错误。

df_temp …

apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
9985
查看次数

Spark 使用纯 SQL 查询提取嵌套 JSON 数组项

注意:这不是以下内容的重复(或其他几个类似的讨论)


我有一个Hive表,我必须纯粹通过Spark-SQL-query读取和处理。该表有一个string-type 列,其中包含JSON来自 API 的转储;因此,正如预期的那样,它具有深度嵌套的字符串化 JSON

让我们看这个例子(它描述了我正在尝试处理的数据的确切深度/复杂性)

{
    "key1": ..
    "key2": ..
    ..
    "bill_summary": {
        "key1": ..
        "key2": ..
        ..
        "items": [
            {
                "item": {
                    "key1": ..
                    "key2": ..
                    ..
                    "type": "item_type_1"
                    ..
                    "total_cost": 57.65
                    ..
                }
            },
            {
                "item": {
                    "key1": ..
                    "key2": ..
                    ..
                    "total_cost": 23.31
                    ..
                }
            }
            ..
            { …
Run Code Online (Sandbox Code Playgroud)

apache-spark-sql

5
推荐指数
1
解决办法
1万
查看次数