我的要求是从列中的注释列中检索订单号,comment
并始终以R
.订单号应作为新列添加到表中.
输入数据:
code,id,mode,location,status,comment
AS-SD,101,Airways,hyderabad,D,order got delayed R1657
FY-YT,102,Airways,Delhi,ND,R7856 package damaged
TY-OP,103,Airways,Pune,D,Order number R5463 not received
Run Code Online (Sandbox Code Playgroud)
预期产量:
AS-SD,101,Airways,hyderabad,D,order got delayed R1657,R1657
FY-YT,102,Airways,Delhi,ND,R7856 package damaged,R7856
TY-OP,103,Airways,Pune,D,Order number R5463 not received,R5463
Run Code Online (Sandbox Code Playgroud)
我在spark-sql中尝试过,我正在使用的查询如下:
val r = sqlContext.sql("select substring(comment, PatIndex('%[0-9]%',comment, length(comment))) as number from A")
Run Code Online (Sandbox Code Playgroud)
但是,我收到以下错误:
org.apache.spark.sql.AnalysisException: undefined function PatIndex; line 0 pos 0
Run Code Online (Sandbox Code Playgroud) 我的要求是为age创建类别.我正在尝试在UDF中编写多个if条件但是它正在使用其他条件.我的代码如下.
我的数据
1,Ashok,23,asd
2,Joi,27,dfs
3,Sam,30,dft
4,Bob,37,dat
Run Code Online (Sandbox Code Playgroud)
我的代码
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.SaveMode
import sqlContext.implicits._
val a = sc.textFile("file2.txt")
a.foreach(println)
val coder: (Int=>String)=(arg:Int)=>{if(arg>20&&arg<27) "20-27";if(arg>30&&arg<37) "30-37"; else "38+"}
val co = udf(coder)
val a2 = a1.select(col("Id"),col("Name"),col("Age"),col("Dpt"))
a2.withColumn("range",co(col("Age"))).show()
Run Code Online (Sandbox Code Playgroud)
输出我得到了
1,Ashok,23,asd,38+
2,Joi,27,dfs,38+
3,Sam,30,dft,38+
4,Bob,37,dat,38+
Run Code Online (Sandbox Code Playgroud)
对于每行显示38+,请建议语法.
我有下面的数据框,我需要将空数组转换为null。
+----+---------+-----------+
| id|count(AS)|count(asdr)|
+----+---------+-----------+
|1110| [12, 45]| [50, 55]|
|1111| []| []|
|1112| [45, 46]| [50, 50]|
|1113| []| []|
+----+---------+-----------+
Run Code Online (Sandbox Code Playgroud)
我试过下面的代码不起作用。
df.na.fill("null").show()
Run Code Online (Sandbox Code Playgroud)
预期输出应为
+----+---------+-----------+
| id|count(AS)|count(asdr)|
+----+---------+-----------+
|1110| [12, 45]| [50, 55]|
|1111| NUll| NUll|
|1112| [45, 46]| [50, 50]|
|1113| NUll| NUll|
+----+---------+-----------+
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 spark-xml jar 在 pyspark 中读取 xml/嵌套 xml。
df = sqlContext.read \
.format("com.databricks.spark.xml")\
.option("rowTag", "hierachy")\
.load("test.xml"
Run Code Online (Sandbox Code Playgroud)
当我执行时,数据框没有正确创建。
+--------------------+
| att|
+--------------------+
|[[1,Data,[Wrapped...|
+--------------------+
Run Code Online (Sandbox Code Playgroud)
下面提到了我的 xml 格式:
我有输入
{'A':'1','B':'1'}
Run Code Online (Sandbox Code Playgroud)
我需要的输出
+----+----+
|A |B |
+----+----+
| 1| 2|
+----+----+
Run Code Online (Sandbox Code Playgroud)
我试过的代码如下。这不起作用
v = {'A': '1','B':'2'}
rdd = sc.parallelize(v.values())
df = spark.createDataFrame(rdd, list(v.keys()))
df.show()
Run Code Online (Sandbox Code Playgroud) 我需要使用 pysparkdf
相对于其他数据df_col
框更改数据框的列名
df
+----+---+----+----+
|code| id|name|work|
+----+---+----+----+
| ASD|101|John| DEV|
| klj|102| ben|prod|
+----+---+----+----+
Run Code Online (Sandbox Code Playgroud)
df_col
+-----------+-----------+
|col_current|col_updated|
+-----------+-----------+
| id| Row_id|
| name| Name|
| code| Row_code|
| Work| Work_Code|
+-----------+-----------+
Run Code Online (Sandbox Code Playgroud)
如果 df 列与 col_current 匹配,则 df 列应替换为 col_updated。例如:如果 df.id 与 df.col_current 匹配,则 df.id 应替换为 Row_id。
预期产出
Row_id,Name,Row_code,Work_code
101,John,ASD,DEV
102,ben,klj,prod
Run Code Online (Sandbox Code Playgroud)
注意:我希望这个过程是动态的。
我需要将数据帧转换为 RDD,并需要对其应用一些核心操作。尝试了以下事情。它正在转换为列表、行或元组格式。但一些核心功能或不适用于 .split() 等功能。
尝试了以下方法:
df.rdd.map(list)
Run Code Online (Sandbox Code Playgroud)
或者
df.rdd.map(tuple)
Run Code Online (Sandbox Code Playgroud)
或者
df.rdd
Run Code Online (Sandbox Code Playgroud)
我正在尝试的示例代码
rdd=load_df.rdd.map(list)
conv_rdd= rdd.map(lambda x:x.split(","))
Run Code Online (Sandbox Code Playgroud)
需要从这里开始执行一些操作
我需要经常更改列位置.而不是更改代码我创建了一个临时数据帧Index_df
.在这里,我将更新列位置,它应该反映更改应该执行的实际数据帧.
sample_df
F_cDc,F_NHY,F_XUI,F_NMY,P_cDc,P_NHY,P_XUI,P_NMY
415 258 854 245 478 278 874 235
405 197 234 456 567 188 108 267
315 458 054 375 898 978 677 134
Run Code Online (Sandbox Code Playgroud)
Index_df
col position
F_cDc,1
F_NHY,3
F_XUI,5
F_NMY,7
P_cDc,2
P_NHY,4
P_XUI,6
P_NMY,8
Run Code Online (Sandbox Code Playgroud)
在这里index_df
,sample_df
应该改变.
预期产量:
F_cDc,P_cDc,F_NHY,P_NHY,F_XUI,P_XUI,F_NMY,P_NMY
415 478 258 278 854 874 245 235
405 567 197 188 234 108 456 267
315 898 458 978 054 677 375 134
Run Code Online (Sandbox Code Playgroud)
这里的列位置根据我更新的位置而改变 Index_df
我能做到,sample_df.select("<column order>")
但我有超过70列.从技术上讲,这不是最好的交易方式.
我想根据条件将元素列表附加到另一个列表.例如:找到下面的代码.
package test
object main {
def main(args: Array[String]): Unit ={
val a = List(1,2,3,4,5)
val b= List[Int]()
for(x <- a){
if (x>3){
b:+x
}
}
println(b)
}
}
Run Code Online (Sandbox Code Playgroud)
当我执行这个我得到空列表.