我想在Spark中进行以下转换我的目标是获得输出,我希望如果我可以进行中间转换,我可以轻松获得输出。关于如何将行转换为列的任何想法都会很有帮助。
RowID Name Place
1 Gaga India,US,UK
1 Katy UK,India,Europe
1 Bey Europe
2 Gaga Null
2 Katy India,Europe
2 Bey US
3 Gaga Europe
3 Katy US
3 Bey Null
Output:
RowID Id Gaga Katy Bey
1 1 India UK Europe
1 2 US India Null
1 3 UK Europe Null
2 1 Null India US
2 2 Null Europe Null
3 1 Europe US Null
Intermediate Output:
RowID Gaga Katy Bey
1 India,US,UK UK,India,Europe Europe
2 Null India,Europe …Run Code Online (Sandbox Code Playgroud) 我需要使用 pysparkdf相对于其他数据df_col框更改数据框的列名
df
+----+---+----+----+
|code| id|name|work|
+----+---+----+----+
| ASD|101|John| DEV|
| klj|102| ben|prod|
+----+---+----+----+
Run Code Online (Sandbox Code Playgroud)
df_col
+-----------+-----------+
|col_current|col_updated|
+-----------+-----------+
| id| Row_id|
| name| Name|
| code| Row_code|
| Work| Work_Code|
+-----------+-----------+
Run Code Online (Sandbox Code Playgroud)
如果 df 列与 col_current 匹配,则 df 列应替换为 col_updated。例如:如果 df.id 与 df.col_current 匹配,则 df.id 应替换为 Row_id。
预期产出
Row_id,Name,Row_code,Work_code
101,John,ASD,DEV
102,ben,klj,prod
Run Code Online (Sandbox Code Playgroud)
注意:我希望这个过程是动态的。
Teradata 有一个名为 的函数ZEROIFNULL,顾名思义,如果列的值为 NULL,则返回零。在类似的行中,还有一个名为 NULLIFZERO 的函数。
我想在 SparkSQL 中模仿/模拟这些功能(不使用数据帧或 RDD API,相反,我想在 SparkSQL 中使用它们,您可以在其中直接传递 SQL。)
有任何想法吗?
在我的 Spark (2.2) DataFrame 中,每一行都是 JSON:
df.head()
//output
//[{"key":"111","event_name":"page-visited","timestamp":1517814315}]
df.show()
//output
//+--------------+
//| value|
//+--------------+
//|{"key":"111...|
//|{"key":"222...|
Run Code Online (Sandbox Code Playgroud)
我想将每个 JSON 行传递给列以获得这个result:
key event_name timestamp
111 page-visited 1517814315
...
Run Code Online (Sandbox Code Playgroud)
我试过这种方法,但它没有给我预期的结果:
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("key", StringType, true), StructField("event_name", StringType, true), StructField("timestamp", IntegerType, true)
))
val result = df.withColumn("value", from_json($"value", schema))
Run Code Online (Sandbox Code Playgroud)
和:
result.printSchema()
root
|-- value: struct (nullable = true)
| |-- key: string (nullable = true)
| |-- event_name: string (nullable = true)
| |-- timestamp: …Run Code Online (Sandbox Code Playgroud) 我在 Spark 中的 Row 类遇到了很多问题。在我看来 Row 类是一个真正设计糟糕的类。从 Row 中提取一个值应该并不比从 Scala 列表中提取一个值更困难;但实际上,您必须知道列的确切类型才能提取它。你甚至不能把列变成字符串;对于像 Spark 这样的伟大框架来说,这有多荒谬?在现实世界中,在大多数情况下,您不知道列的确切类型,而且在许多情况下,最重要的是,您有数十个或数百个列。下面是一个示例,向您展示我得到的 ClassCastExceptions。
有没有人有任何解决方案可以轻松地从 Row 中提取值?
scala> val df = List((1,2),(3,4)).toDF("col1","col2")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: int]
scala> df.first.getAs[String]("col1")
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
... 56 elided
scala> df.first.getAs[Int]("col1")
res12: Int = 1
scala> df.first.getInt(0)
res13: Int = 1
scala> df.first.getLong(0)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
at org.apache.spark.sql.Row$class.getLong(Row.scala:231)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:165)
... 56 elided
scala> df.first.getFloat(0)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Float …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql spark-dataframe apache-spark-dataset
如何在 spark sql 中动态构建布尔过滤条件?拥有:
val d = Seq(1, 2, 3, 5, 6).toDF
d.filter(col("value") === 1 or col("value") === 3).show
Run Code Online (Sandbox Code Playgroud)
我怎样才能动态复制这个:
val desiredThings = Seq(1,3)
Run Code Online (Sandbox Code Playgroud)
我尝试构建过滤器:
val myCondition = desiredThings.map(col("value") === _)
d.filter(myCondition).show
Run Code Online (Sandbox Code Playgroud)
但失败:
overloaded method value filter with alternatives:
org.apache.spark.api.java.function.FilterFunction[org.apache.spark.sql.Row]
cannot be applied to (Seq[org.apache.spark.sql.Column])
Run Code Online (Sandbox Code Playgroud)
执行时
d.filter(myCondition).show
Run Code Online (Sandbox Code Playgroud)
同样在尝试向左折叠时:
val myCondition = desiredThings.foldLeft()((result, entry) => result && col(c.columnCounterId) === entry)
Run Code Online (Sandbox Code Playgroud)
我有编译错误。
如何调整代码以动态生成过滤谓词?
dynamic filter apache-spark apache-spark-sql spark-dataframe
我有以下格式的数据框
+-----------------+-------------------------+-----------+-----------------------------+---------------------------+----------------------------------+--------------------------------------+--------------------------------+
|DataPartition |TimeStamp |FFAction|!||IdentifierValue_effectiveFrom|IdentifierValue_effectiveTo|IdentifierValue_identifierEntityId|IdentifierValue_identifierEntityTypeId|IdentifierValue_identifierTypeId|
+-----------------+-------------------------+-----------+-----------------------------+---------------------------+----------------------------------+--------------------------------------+--------------------------------+
|SelfSourcedPublic|2018-03-05T11:54:18+00:00|I|!| |1900-01-01T00:00:00+00:00 |9999-12-31T00:00:00+00:00 |4295903126 |404010 |320150 |
+-----------------+-------------------------+-----------+-----------------------------+---------------------------+----------------------------------+--------------------------------------+--------------------------------+
Run Code Online (Sandbox Code Playgroud)
我想在下面的列中添加带有条件的额外列
IdentifierValue_identifierEntityTypeId
Run Code Online (Sandbox Code Playgroud)
添加具有以下条件的额外列分区
如果 IdentifierValue_identifierEntityTypeId =1001371402 那么分区 =Repno2FundamentalSeries 否则如果 IdentifierValue_identifierEntityTypeId404010 那么分区 = Repno2Organization
这就是我正在努力实现的目标
val temp = temp1.withColumn("Partition", when($"IdentifierValue_identifierEntityTypeId" === "404010", 0).otherwise("Repno2FundamentalSeries"))
temp.show(false)
Run Code Online (Sandbox Code Playgroud)
我的输出低于输出,但得到的值为零
IdentifierValue_identifierEntityTypeId
Run Code Online (Sandbox Code Playgroud)
我是 Scala 的新手,所以提出了基本问题
对于列上的多个条件如何写 when 和 else 。这对我不起作用
线程“main”中的异常 java.lang.IllegalArgumentException:otherwise() 只能在先前由 when() 生成的列上应用一次
val dataMain = dataMain1.withColumn(
"Partition",
when($"RelationObjectId_relatedObjectType" === "EDInstrument" && $"RelationObjectId_relatedObjectType" === "Fundamental", "Instrument2Fundamental")
.otherwise(when($"RelationObjectId_relatedObjectType" === "EDInstrument" && $"RelationObjectId_relatedObjectType" === "FundamentalSeries", "Instrument2FundamentalSeries"))
.otherwise(when($"RelationObjectId_relatedObjectType" === "Organization" && $"RelationObjectId_relatedObjectType" === "Fundamental", …Run Code Online (Sandbox Code Playgroud) 为什么我得到不同的输出..agg(countDistinct("member_id") as "count")和..distinct.count?的区别是一样的之间select count(distinct member_id)和select distinct count(member_id)?
我正在 Scala/Spark 中对行级别进行一些计算。我有一个使用下面的 JSON 创建的数据框 -
{"available":false,"createTime":"2016-01-08","dataValue":{"names_source":{"first_names":["abc", "def"],"last_names_id":[123,456]},"another_source_array":[{"first":"1.1","last":"ONE"}],"another_source":"TableSources","location":"GMP", "timestamp":"2018-02-11"},"deleteTime":"2016-01-08"}
Run Code Online (Sandbox Code Playgroud)
您可以直接使用此 JSON 创建数据框。我的架构如下所示-
root
|-- available: boolean (nullable = true)
|-- createTime: string (nullable = true)
|-- dataValue: struct (nullable = true)
| |-- another_source: string (nullable = true)
| |-- another_source_array: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- first: string (nullable = true)
| | | |-- last: string (nullable = true)
| |-- location: string (nullable = true)
| |-- …Run Code Online (Sandbox Code Playgroud) 我正在寻找一种方法来按小时聚合我的数据。我想首先在我的 evtTime 中只保留几个小时。我的 DataFrame 看起来像这样:
Row(access=u'WRITE',
agentHost=u'xxxxxx50.haas.xxxxxx',
cliIP=u'192.000.00.000',
enforcer=u'ranger-acl',
event_count=1,
event_dur_ms=0,
evtTime=u'2017-10-01 23:03:51.337',
id=u'a43d824c-1e53-439b-b374-96b76bacf714',
logType=u'RangerAudit',
policy=699,
reason=u'/project-h/xxxx/xxxx/warehouse/rocq.db/f_crcm_res_temps_retrait',
repoType=1,
reqUser=u'rocqphadm',
resType=u'path',
resource=u'/project-h/xxxx/xxxx/warehouse/rocq.db/f_crcm_res_temps_retrait',
result=1,
seq_num=342976577)
Run Code Online (Sandbox Code Playgroud)
我的目标随后是按 reqUser 分组并计算 event_count 的总和。我试过这个:
func = udf (lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f'), DateType())
df1 = df.withColumn('DATE', func(col('evtTime')))
metrics_DataFrame = (df1
.groupBy(hour('DATE'), 'reqUser')
.agg({'event_count': 'sum'})
)
Run Code Online (Sandbox Code Playgroud)
这是结果:
[Row(hour(DATE)=0, reqUser=u'A383914', sum(event_count)=12114),
Row(hour(DATE)=0, reqUser=u'xxxxadm', sum(event_count)=211631),
Row(hour(DATE)=0, reqUser=u'splunk-system-user', sum(event_count)=48),
Row(hour(DATE)=0, reqUser=u'adm', sum(event_count)=7608),
Row(hour(DATE)=0, reqUser=u'X165473', sum(event_count)=2)]
Run Code Online (Sandbox Code Playgroud)
我的目标是得到这样的东西:
[Row(hour(DATE)=2017-10-01 23:00:00, reqUser=u'A383914', sum(event_count)=12114),
Row(hour(DATE)=2017-10-01 22:00:00, reqUser=u'xxxxadm', sum(event_count)=211631),
Row(hour(DATE)=2017-10-01 08:00:00, reqUser=u'splunk-system-user', sum(event_count)=48),
Row(hour(DATE)=2017-10-01 03:00:00, …Run Code Online (Sandbox Code Playgroud) apache-spark-sql ×10
apache-spark ×9
scala ×5
pyspark ×4
dataframe ×2
sql ×2
dynamic ×1
filter ×1
json ×1
pyspark-sql ×1
python-2.7 ×1