标签: apache-spark-sql

Spark SQL count()返回错误的数字

我是Apache Spark和Scala的新手(也是Hadoop的初学者).我完成了Spark SQL教程:https://spark.apache.org/docs/latest/sql-programming-guide.html 我尝试对标准csv文件执行简单查询,以便在我当前的集群上对其性能进行基准测试.

我使用来自https://s3.amazonaws.com/hw-sandbox/tutorial1/NYSE-2000-2001.tsv.gz的数据,将其转换为csv并复制/粘贴数据,使其大10倍.

我使用Scala将它加载到Spark中:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
Run Code Online (Sandbox Code Playgroud)

定义类:

case class datum(exchange: String,stock_symbol: String,date: String,stock_price_open: Double,stock_price_high: Double,stock_price_low: Double,stock_price_close: Double,stock_volume: String,stock_price_adj_close: Double)
Run Code Online (Sandbox Code Playgroud)

读入数据:

val data = sc.textFile("input.csv").map(_.split(";")).filter(line => "exchange" != "exchange").map(p => datum(p(0).trim.toString, p(1).trim.toString, p(2).trim.toString, p(3).trim.toDouble, p(4).trim.toDouble, p(5).trim.toDouble, p(6).trim.toDouble, p(7).trim.toString, p(8).trim.toDouble))
Run Code Online (Sandbox Code Playgroud)

转换为表格:

data.registerAsTable("data")
Run Code Online (Sandbox Code Playgroud)

定义查询(列出所有以'IBM'作为股票代码的行):

val IBMs = sqlContext.sql("SELECT * FROM data WHERE stock_symbol …
Run Code Online (Sandbox Code Playgroud)

sql scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
5128
查看次数

Pyspark 读取 csv

读取 csv 后,我得到了意外的输出:

MessageName;ContactKey;DateTimeSend;MessageContent;MessageOpened;OpenDate;TimeInApp;Platform;PlatformVersion;Status
20200903 - NL SPAARUPDATE Augustus;0031t00000A4w0xAAB;09/03/2020 8:09;Vorige maand heb je dankzij de Lidl-Plus app %%savings%% euro gespaard. Goed bezig! ??????;no;;;iPhone OS;12.4.5;Success
Run Code Online (Sandbox Code Playgroud)

正如您可以想象的那样,输出需要将此信息拆分为列和单元格以创建正常的数据框。

我尝试了以下代码:

df = spark.read.csv('/FileStore/tables/BE_August_monthlysaving.csv', header='true')

display(df)

Run Code Online (Sandbox Code Playgroud)

或者,我尝试, delimiter=';'在标题之前和之后使用,但是当我这样做时,出现以下错误:

csv() 得到意外的关键字参数“分隔符”

知道如何解决这个输出吗?

csv dataframe apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
806
查看次数

将 int YYYYMMDD 转换为日期 pyspark

我正在尝试使用 Pyspark 将 Databricks 中的 INT 列转换为日期列。该列如下所示:

Report_Date
20210102
20210102
20210106
20210103
20210104
Run Code Online (Sandbox Code Playgroud)

我正在尝试使用 CAST 函数

df = df.withColumn("Report_Date", col("Report_Date").cast(DateType()))
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误:

由于数据类型不匹配,无法解析“CAST(`Report_Date` AS DATE)”:无法将 int 转换为 date;

你知道我怎样才能得到预期的输出吗?

date-formatting apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
1万
查看次数

由于 False 作为条目,pyspark 中 json 文件的记录已损坏

我有一个 json 文件,如下所示:

test= {'kpiData': [{'date': '2020-06-03 10:05',
   'a': 'MINIMUMINTERVAL',
   'b': 0.0,
   'c': True},
  {'date': '2020-06-03 10:10',
   'a': 'MINIMUMINTERVAL',
   'b': 0.0,
   'c': True},
  {'date': '2020-06-03 10:15',
   'a': 'MINIMUMINTERVAL',
   'b': 0.0,
   'c': True},
  {'date': '2020-06-03 10:20',
   'a': 'MINIMUMINTERVAL',
   'b': 0.0,}
]}
Run Code Online (Sandbox Code Playgroud)

我想将其传输到数据框对象,如下所示:

rdd = sc.parallelize([test])
jsonDF = spark.read.json(rdd)
Run Code Online (Sandbox Code Playgroud)

这会导致记录损坏。据我了解,其原因是,TrueFalse不能是 Python 中的条目。所以我需要在之前将这些条目转换spark.read.json()为 TRUE、true 或“True”)。test 是一个字典,rdd 是一个 pyspark.rdd.RDD 对象。对于数据帧对象,转换非常简单,但我没有找到这些对象的解决方案。

json apache-spark rdd apache-spark-sql pyspark

0
推荐指数
1
解决办法
2163
查看次数

如何使用 pyspark 计算两个 ArrayType 列之间的按元素乘法

我正在尝试计算 Pyspark 数据框中两个 ArrayType 列之间的按元素乘积。我尝试使用下面的方法来实现这一点,但似乎无法得到正确的结果......

from pyspark.sql import functions as F

data.withColumn("array_product", F.expr("transform(CASUAL_TOPS_SIMILARITY_SCORE, (x, PER_UNA_SIMILARITY_SCORE) -> x * PER_UNA_SIMILARITY_SCORE)"))
Run Code Online (Sandbox Code Playgroud)

有人对我如何在这里获得正确的结果有任何提示吗?我在下面的 DataFrame 中附加了一个测试行...我需要将列CASUAL_TOPS_SIMILARITY_SCOREPER_UNA_SIMILARITY_SCORE

import json 
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test").getOrCreate()

js = '{"PER_UNA_SIMILARITY_SCORE":{"category_list":[0.9736891648,0.9242207186,0.9717901106,0.9763716155,0.9440944231,0.9708032326,0.9599383329,0.9705343027,0.804267581,0.9597317177,0.9316773281,0.8076725314,0.9555369889,0.9753550725,0.9811865431,1.0,0.8231541809,0.9738989392,0.9780283991,0.9644088011,0.9798529418,0.9347357116,0.9727502648,0.9778486916,0.8621780792,0.9735844196,0.9582644436,0.9579092722,0.8890027888,0.9394986243,0.9563411605,0.9811867597,0.9738380108,0.9577698381,0.7912932623,0.9778158279]},"CASUAL_TOPS_SIMILARITY_SCORE":{"category_list":[0.7924168764,0.7511316884,0.7925161719,0.8007234107,0.7953468064,0.7882556409,0.7778519374,0.7881058994,1.0,0.7785517364,0.7733458123,0.7426205538,0.7905195275,0.7925983778,0.7983386701,0.804267581,0.6749185095,0.7924821952,0.8016348085,0.7895650508,0.7985721918,0.772656847,0.7897495222,0.7948759958,0.6996340275,0.8024327668,0.7784598142,0.7942396044,0.7159431296,0.7850145414,0.7768001023,0.7983372946,0.7971616495,0.7927845035,0.6462844274,0.799555357]}}'

a_json = json.loads(js)

data = spark.createDataFrame(pd.DataFrame.from_dict(a_json))
Run Code Online (Sandbox Code Playgroud)

python apache-spark-sql pyspark

0
推荐指数
1
解决办法
1255
查看次数

Spark DDL 架构 JSON 结构

问题

我试图在 pyspark 中定义嵌套 .json 模式,但无法使 ddl_schema 字符串正常工作。

通常在 SQL 中这将是 ROW,我已经尝试过下面的 STRUCT 但无法获得正确的数据类型,这是错误......

ParseException: 
mismatched input '(' expecting {<EOF>, ',', 'COMMENT', NOT}(line 6, pos 15)

== SQL ==

    driverId INT,
    driverRef STRING,
    number STRING,
    code STRING,
    name STRUCT(forename STRING, surname STRING),
---------------^^^
    dob DATE,
    nationality STRING,
    url STRING
Run Code Online (Sandbox Code Playgroud)

数据样本

            +--------+----------+------+----+--------------------+----------+-----------+--------------------+
            |driverId| driverRef|number|code|                name|       dob|nationality|                 url|
            +--------+----------+------+----+--------------------+----------+-----------+--------------------+
            |       1|  hamilton|    44| HAM|   {Lewis, Hamilton}|1985-01-07|    British|http://en.wikiped...|
Run Code Online (Sandbox Code Playgroud)

代码示例

            +--------+----------+------+----+--------------------+----------+-----------+--------------------+
            |driverId| driverRef|number|code|                name|       dob|nationality|                 url|
            +--------+----------+------+----+--------------------+----------+-----------+--------------------+
            |       1|  hamilton|    44| HAM|   {Lewis, Hamilton}|1985-01-07| …
Run Code Online (Sandbox Code Playgroud)

ddl json apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
3350
查看次数

Spark SQL 类似于组内的 LISTAGG() OR GROUP_CONCAT

我需要在组内实现类似于 redshift listagg() 的函数(按 x_column 排序),但在 Spark SQL 中很重要,这里的https://spark.apache.org/docs/2.4.0/api/ sql/

一个类似的问题,但答案不是没有 SQL。

我对 Redshift SQL 的查询是:

select KEY,
listagg(CODE, '-') within group (order by DATE) as CODE
from demo_table
group by KEY
Run Code Online (Sandbox Code Playgroud)

此时,order by 语句并不重要,只需使用 group by 聚合所有列就足够了,我尝试过 concat_ws,但它无法按预期工作

将其放在 pyspark 上对我来说不起作用

钥匙 代码 日期
66 PL 2016年11月1日
66 PL 2016年12月1日
67 吉林 2016年12月1日
67 吉林 2016年10月1日
67 PL 2016年9月1日
67 采购订单 2016年8月1日
67 吉林 2016年12月1日
68 PL 2016年11月1日
68 2016年11月1日

所需输出

钥匙 代码
68 JO-PL …

sql apache-spark apache-spark-sql

0
推荐指数
1
解决办法
7053
查看次数

PySpark-如何找出数组列中最常出现的前 n 个值?

对于下面的示例数据,想知道如何找出列中最常出现的值colour。的数据类型colour是WrappedArray。数组中可能有 n 个元素。在此示例中,颜色应为黄色,然后是出现两次的蓝色。非常感谢您的帮助。

Name   Colour 
 A      ('blue','yellow')
 B      ('pink', 'yellow')
 C      ('green', 'black')
 D      ('yellow','orange','blue')
Run Code Online (Sandbox Code Playgroud)

python arrays apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
190
查看次数

将一种类型的 Spark scala 数据集转换为另一种类型

我有一个具有以下案例类类型的数据集:

  case class AddressRawData(
                         addressId: String,
                         customerId: String,
                         address: String
                       )
Run Code Online (Sandbox Code Playgroud)

我想将其转换为:

case class AddressData(
                          addressId: String,
                          customerId: String,
                          address: String,
                          number: Option[Int], //i.e. it is optional
                          road: Option[String],
                          city: Option[String],
                          country: Option[String]
                        )
Run Code Online (Sandbox Code Playgroud)

使用解析器函数:

  def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
    unparsedAddress.map(address => {
      val split = address.address.split(", ")
      address.copy(
        number = Some(split(0).toInt),
        road = Some(split(1)),
        city = Some(split(2)),
        country = Some(split(3))
      )
    }
    )
  }
Run Code Online (Sandbox Code Playgroud)

我是 Scala 和 Spark 的新手。谁能告诉我如何做到这一点?

scala apache-spark apache-spark-sql scala-spark

0
推荐指数
1
解决办法
434
查看次数

在 PySpark 中使用动态键展平嵌套 JSON 结构

我正在尝试使用PySpark处理包含带有动态键的结构列的 json 文件。

结构列的架构如下所示:

{
  "UUID_KEY": {
     "time": STRING
     "amount": INTEGER
  }
}
Run Code Online (Sandbox Code Playgroud)

数据如下:

ID json_列
1 “{1:{金额:1,时间:2},2:{金额:10,时间:5}}”
2 “{3:{金额:1,时间:2},4:{金额:10,时间:5}”

目前,我将结构列作为字符串,因为通过指定/推断模式加载 JSON 不起作用因为第一层的键是随机生成的,并且数据太多。第二层始终相同,它包含amounttime

有没有办法在不知道第一层的键的情况下将此 JSON 字符串平铺到amount和列中?time

json apache-spark apache-spark-sql pyspark databricks

0
推荐指数
1
解决办法
718
查看次数