小编Jos*_*emy的帖子

连续行之间的日期差异 - Pyspark Dataframe

我有一个具有以下结构的表

USER_ID     Tweet_ID                 Date
  1           1001       Thu Aug 05 19:11:39 +0000 2010
  1           6022       Mon Aug 09 17:51:19 +0000 2010
  1           1041       Sun Aug 19 11:10:09 +0000 2010
  2           9483       Mon Jan 11 10:51:23 +0000 2012
  2           4532       Fri May 21 11:11:11 +0000 2012
  3           4374       Sat Jul 10 03:21:23 +0000 2013
  3           4334       Sun Jul 11 04:53:13 +0000 2013
Run Code Online (Sandbox Code Playgroud)

基本上我想要做的是有一个PysparkSQL查询,它计算具有相同user_id号的连续记录的日期差异(以秒为单位).预期结果将是:

1      Sun Aug 19 11:10:09 +0000 2010 - Mon Aug 09 17:51:19 +0000 2010     839930
1      Mon …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark pyspark-sql

7
推荐指数
2
解决办法
4036
查看次数

列要素必须是org.apache.spark.ml.linalg.VectorUDT类型

我想在pyspark(spark 2.1.1)中运行此代码:

from pyspark.ml.feature import PCA

bankPCA = PCA(k=3, inputCol="features", outputCol="pcaFeatures") 
pcaModel = bankPCA.fit(bankDf)    
pcaResult = pcaModel.transform(bankDF).select("label", "pcaFeatures")    
pcaResult.show(truncate= false)
Run Code Online (Sandbox Code Playgroud)

但我得到这个错误:

要求失败:列要素必须是类型 org.apache.spark.ml.linalg.Vect orUDT@3bfc3ba7 但实际上是 org.apache.spark.mllib.linalg.VectorUDT@f71b0bce.

import apache-spark pyspark

5
推荐指数
1
解决办法
6936
查看次数

如何在JSON中使用read.schema仅指定特定字段:SPARK Scala

我试图以编程方式在textFile上强制执行schema(json),它看起来像json.我尝试使用jsonFile,但问题是从json文件列表创建数据帧,spark必须通过数据传递1次来为数据帧创建模式.因此,它需要解析所有需要更长时间的数据(自我的数据压缩后4小时以及TB的大小).因此,我想尝试将其作为textFile读取并强制执行模式以单独获取感兴趣的字段,以便稍后查询结果数据框.但我不知道如何将其映射到输入.有些人可以给我一些参考,如何将模式映射到json,如输入.

输入:

这是完整的架构:

records: org.apache.spark.sql.DataFrame = [country: string, countryFeatures: string, customerId: string, homeCountry: string, homeCountryFeatures: string, places: array<struct<freeTrial:boolean,placeId:string,placeRating:bigint>>, siteName: string, siteId: string, siteTypeId: string, Timestamp: bigint, Timezone: string, countryId: string, pageId: string, homeId: string, pageType: string, model: string, requestId: string, sessionId: string, inputs: array<struct<inputName:string,inputType:string,inputId:string,offerType:string,originalRating:bigint,processed:boolean,rating:bigint,score:double,methodId:string>>] 
Run Code Online (Sandbox Code Playgroud)

但我只对以下几个领域感兴趣:

res45: Array[String] = Array({"requestId":"bnjinmm","siteName":"bueller","pageType":"ad","model":"prepare","inputs":[{"methodId":"436136582","inputType":"US","processed":true,"rating":0,"originalRating":1},{"methodId":"23232322","inputType":"UK","processed":falase,"rating":0,"originalRating":1}]


 val  records = sc.textFile("s3://testData/sample.json.gz")

  val schema = StructType(Array(StructField("requestId",StringType,true),
                          StructField("siteName",StringType,true),
                          StructField("model",StringType,true),
                          StructField("pageType",StringType,true),
                          StructField("inputs", ArrayType(
                                StructType(
                                            StructField("inputType",StringType,true), 
                                            StructField("originalRating",LongType,true), 
                                            StructField("processed",BooleanType,true), 
                                            StructField("rating",LongType,true), 
                                            StructField("methodId",StringType,true)
                                            ),true),true)))

    val rowRDD = ?? 

    val inputRDD = sqlContext.applySchema(rowRDD, schema)
    inputRDD.registerTempTable("input")

     sql("select …
Run Code Online (Sandbox Code Playgroud)

json scala apache-spark rdd

1
推荐指数
1
解决办法
1万
查看次数

标签 统计

apache-spark ×3

pyspark ×2

import ×1

json ×1

pyspark-sql ×1

python ×1

rdd ×1

scala ×1