标签: apache-spark-sql

在 Spark SQL (pyspark) 中将行转置为列

我想在Spark中进行以下转换我的目标是获得输出,我希望如果我可以进行中间转换,我可以轻松获得输出。关于如何将行转换为列的任何想法都会很有帮助。

RowID  Name  Place
1      Gaga India,US,UK
1      Katy UK,India,Europe
1      Bey  Europe
2      Gaga Null
2      Katy India,Europe
2      Bey  US
3      Gaga Europe
3      Katy US
3      Bey  Null

Output:

RowID   Id  Gaga    Katy    Bey
1       1   India   UK      Europe
1       2   US      India   Null
1       3   UK      Europe  Null
2       1   Null    India   US
2       2   Null    Europe  Null
3       1   Europe  US      Null


Intermediate Output:

RowID   Gaga         Katy               Bey
1       India,US,UK  UK,India,Europe    Europe
2       Null         India,Europe …
Run Code Online (Sandbox Code Playgroud)

sql apache-spark-sql pyspark

1
推荐指数
1
解决办法
6278
查看次数

如何相对于其他数据框更改数据框的列名

我需要使用 pysparkdf相对于其他数据df_col框更改数据框的列名

df

+----+---+----+----+
|code| id|name|work|
+----+---+----+----+
| ASD|101|John| DEV|
| klj|102| ben|prod|
+----+---+----+----+
Run Code Online (Sandbox Code Playgroud)

df_col

+-----------+-----------+
|col_current|col_updated|
+-----------+-----------+
|         id|     Row_id|
|       name|       Name|
|       code|   Row_code|
|       Work|  Work_Code|
+-----------+-----------+
Run Code Online (Sandbox Code Playgroud)

如果 df 列与 col_current 匹配,则 df 列应替换为 col_updated。例如:如果 df.id 与 df.col_current 匹配,则 df.id 应替换为 Row_id。

预期产出

Row_id,Name,Row_code,Work_code
101,John,ASD,DEV
102,ben,klj,prod
Run Code Online (Sandbox Code Playgroud)

注意:我希望这个过程是动态的。

dataframe apache-spark apache-spark-sql pyspark pyspark-sql

1
推荐指数
1
解决办法
165
查看次数

如何在SparkSQL中模仿ZEROIFNULL的功能

Teradata 有一个名为 的函数ZEROIFNULL,顾名思义,如果列的值为 NULL,则返回零。在类似的行中,还有一个名为 NULLIFZERO 的函数。

我想在 SparkSQL 中模仿/模拟这些功能(不使用数据帧或 RDD API,相反,我想在 SparkSQL 中使用它们,您可以在其中直接传递 SQL。)

有任何想法吗?

apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
2344
查看次数

如何将每一行 JSON 解析为 Spark 2 DataFrame 的列?

在我的 Spark (2.2) DataFrame 中,每一行都是 JSON:

df.head()
//output
//[{"key":"111","event_name":"page-visited","timestamp":1517814315}]

df.show()
//output
//+--------------+
//|         value|
//+--------------+
//|{"key":"111...|
//|{"key":"222...|
Run Code Online (Sandbox Code Playgroud)

我想将每个 JSON 行传递给列以获得这个result

key   event_name     timestamp
111   page-visited   1517814315
...
Run Code Online (Sandbox Code Playgroud)

我试过这种方法,但它没有给我预期的结果:

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(
     StructField("key", StringType, true), StructField("event_name", StringType, true), StructField("timestamp", IntegerType, true)
))

val result = df.withColumn("value", from_json($"value", schema))
Run Code Online (Sandbox Code Playgroud)

和:

result.printSchema()
root
 |-- value: struct (nullable = true)
 |    |-- key: string (nullable = true)
 |    |-- event_name: string (nullable = true)
 |    |-- timestamp: …
Run Code Online (Sandbox Code Playgroud)

json scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
4165
查看次数

Apache Spark:从行中提取值的问题

我在 Spark 中的 Row 类遇到了很多问题。在我看来 Row 类是一个真正设计糟糕的类。从 Row 中提取一个值应该并不比从 Scala 列表中提取一个值更困难;但实际上,您必须知道列的确切类型才能提取它。你甚至不能把列变成字符串;对于像 Spark 这样的伟大框架来说,这有多荒谬?在现实世界中,在大多数情况下,您不知道列的确切类型,而且在许多情况下,最重要的是,您有数十个或数百个列。下面是一个示例,向您展示我得到的 ClassCastExceptions。

有没有人有任何解决方案可以轻松地从 Row 中提取值?

scala> val df = List((1,2),(3,4)).toDF("col1","col2")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: int]


scala> df.first.getAs[String]("col1")
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
  ... 56 elided

scala> df.first.getAs[Int]("col1")
res12: Int = 1

scala> df.first.getInt(0)
res13: Int = 1

scala> df.first.getLong(0)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
  at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
  at org.apache.spark.sql.Row$class.getLong(Row.scala:231)
  at org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:165)
  ... 56 elided

scala> df.first.getFloat(0)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Float …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe apache-spark-dataset

1
推荐指数
1
解决办法
2680
查看次数

spark sql动态过滤条件

如何在 spark sql 中动态构建布尔过滤条件?拥有:

val d = Seq(1, 2, 3, 5, 6).toDF
d.filter(col("value") === 1 or col("value") === 3).show
Run Code Online (Sandbox Code Playgroud)

我怎样才能动态复制这个:

val desiredThings = Seq(1,3)
Run Code Online (Sandbox Code Playgroud)

我尝试构建过滤器:

val myCondition = desiredThings.map(col("value") === _)
d.filter(myCondition).show
Run Code Online (Sandbox Code Playgroud)

但失败:

overloaded method value filter with alternatives:
org.apache.spark.api.java.function.FilterFunction[org.apache.spark.sql.Row]
 cannot be applied to (Seq[org.apache.spark.sql.Column])
Run Code Online (Sandbox Code Playgroud)

执行时

d.filter(myCondition).show
Run Code Online (Sandbox Code Playgroud)

同样在尝试向左折叠时:

val myCondition = desiredThings.foldLeft()((result, entry) => result && col(c.columnCounterId) === entry)
Run Code Online (Sandbox Code Playgroud)

我有编译错误。

如何调整代码以动态生成过滤谓词?

dynamic filter apache-spark apache-spark-sql spark-dataframe

1
推荐指数
1
解决办法
3782
查看次数

如何在 Scala/Spark 数据框中的每一行中使用 withColumn 和条件

我有以下格式的数据框

+-----------------+-------------------------+-----------+-----------------------------+---------------------------+----------------------------------+--------------------------------------+--------------------------------+
|DataPartition    |TimeStamp                |FFAction|!||IdentifierValue_effectiveFrom|IdentifierValue_effectiveTo|IdentifierValue_identifierEntityId|IdentifierValue_identifierEntityTypeId|IdentifierValue_identifierTypeId|
+-----------------+-------------------------+-----------+-----------------------------+---------------------------+----------------------------------+--------------------------------------+--------------------------------+
|SelfSourcedPublic|2018-03-05T11:54:18+00:00|I|!|       |1900-01-01T00:00:00+00:00    |9999-12-31T00:00:00+00:00  |4295903126                        |404010                                |320150                          |
+-----------------+-------------------------+-----------+-----------------------------+---------------------------+----------------------------------+--------------------------------------+--------------------------------+
Run Code Online (Sandbox Code Playgroud)

我想在下面的列中添加带有条件的额外列

IdentifierValue_identifierEntityTypeId
Run Code Online (Sandbox Code Playgroud)

添加具有以下条件的额外列分区

如果 IdentifierValue_identifierEntityTypeId =1001371402 那么分区 =Repno2FundamentalSeries 否则如果 IdentifierValue_identifierEntityTypeId404010 那么分区 = Repno2Organization

这就是我正在努力实现的目标

 val temp = temp1.withColumn("Partition", when($"IdentifierValue_identifierEntityTypeId" === "404010", 0).otherwise("Repno2FundamentalSeries"))
    temp.show(false)
Run Code Online (Sandbox Code Playgroud)

我的输出低于输出,但得到的值为零

IdentifierValue_identifierEntityTypeId
Run Code Online (Sandbox Code Playgroud)

我是 Scala 的新手,所以提出了基本问题

对于列上的多个条件如何写 when 和 else 。这对我不起作用

线程“main”中的异常 java.lang.IllegalArgumentException:otherwise() 只能在先前由 when() 生成的列上应用一次

val dataMain = dataMain1.withColumn(
      "Partition",
      when($"RelationObjectId_relatedObjectType" === "EDInstrument" && $"RelationObjectId_relatedObjectType" === "Fundamental", "Instrument2Fundamental")
        .otherwise(when($"RelationObjectId_relatedObjectType" === "EDInstrument" && $"RelationObjectId_relatedObjectType" === "FundamentalSeries", "Instrument2FundamentalSeries"))
        .otherwise(when($"RelationObjectId_relatedObjectType" === "Organization" && $"RelationObjectId_relatedObjectType" === "Fundamental", …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
8562
查看次数

countDistinct 和 distinct.count 的区别

为什么我得到不同的输出..agg(countDistinct("member_id") as "count")..distinct.count?的区别是一样的之间select count(distinct member_id)select distinct count(member_id)

sql scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
1041
查看次数

使用任何函数获取scala中一行的结构类型元素

我正在 Scala/Spark 中对行级别进行一些计算。我有一个使用下面的 JSON 创建的数据框 -

{"available":false,"createTime":"2016-01-08","dataValue":{"names_source":{"first_names":["abc", "def"],"last_names_id":[123,456]},"another_source_array":[{"first":"1.1","last":"ONE"}],"another_source":"TableSources","location":"GMP", "timestamp":"2018-02-11"},"deleteTime":"2016-01-08"}
Run Code Online (Sandbox Code Playgroud)

您可以直接使用此 JSON 创建数据框。我的架构如下所示-

root
 |-- available: boolean (nullable = true)
 |-- createTime: string (nullable = true)
 |-- dataValue: struct (nullable = true)
 |    |-- another_source: string (nullable = true)
 |    |-- another_source_array: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- first: string (nullable = true)
 |    |    |    |-- last: string (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
8738
查看次数

PySpark 中的每小时聚合

我正在寻找一种方法来按小时聚合我的数据。我想首先在我的 evtTime 中只保留几个小时。我的 DataFrame 看起来像这样:

Row(access=u'WRITE', 
    agentHost=u'xxxxxx50.haas.xxxxxx', 
    cliIP=u'192.000.00.000', 
    enforcer=u'ranger-acl', 
    event_count=1, 
    event_dur_ms=0, 
    evtTime=u'2017-10-01 23:03:51.337', 
    id=u'a43d824c-1e53-439b-b374-96b76bacf714', 
    logType=u'RangerAudit', 
    policy=699, 
    reason=u'/project-h/xxxx/xxxx/warehouse/rocq.db/f_crcm_res_temps_retrait', 
    repoType=1, 
    reqUser=u'rocqphadm', 
    resType=u'path', 
    resource=u'/project-h/xxxx/xxxx/warehouse/rocq.db/f_crcm_res_temps_retrait', 
    result=1, 
    seq_num=342976577) 
Run Code Online (Sandbox Code Playgroud)

我的目标随后是按 reqUser 分组并计算 event_count 的总和。我试过这个:

func =  udf (lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f'), DateType())
df1 = df.withColumn('DATE', func(col('evtTime')))

metrics_DataFrame = (df1
                 .groupBy(hour('DATE'), 'reqUser')
                 .agg({'event_count': 'sum'})
                )
Run Code Online (Sandbox Code Playgroud)

这是结果:

[Row(hour(DATE)=0, reqUser=u'A383914', sum(event_count)=12114),
Row(hour(DATE)=0, reqUser=u'xxxxadm', sum(event_count)=211631),
Row(hour(DATE)=0, reqUser=u'splunk-system-user', sum(event_count)=48),
Row(hour(DATE)=0, reqUser=u'adm', sum(event_count)=7608),
Row(hour(DATE)=0, reqUser=u'X165473', sum(event_count)=2)]
Run Code Online (Sandbox Code Playgroud)

我的目标是得到这样的东西:

[Row(hour(DATE)=2017-10-01 23:00:00, reqUser=u'A383914', sum(event_count)=12114),
Row(hour(DATE)=2017-10-01 22:00:00, reqUser=u'xxxxadm', sum(event_count)=211631),
Row(hour(DATE)=2017-10-01 08:00:00, reqUser=u'splunk-system-user', sum(event_count)=48),
Row(hour(DATE)=2017-10-01 03:00:00, …
Run Code Online (Sandbox Code Playgroud)

dataframe python-2.7 apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
1728
查看次数