我是Apache Spark和Scala的新手(也是Hadoop的初学者).我完成了Spark SQL教程:https://spark.apache.org/docs/latest/sql-programming-guide.html 我尝试对标准csv文件执行简单查询,以便在我当前的集群上对其性能进行基准测试.
我使用来自https://s3.amazonaws.com/hw-sandbox/tutorial1/NYSE-2000-2001.tsv.gz的数据,将其转换为csv并复制/粘贴数据,使其大10倍.
我使用Scala将它加载到Spark中:
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
Run Code Online (Sandbox Code Playgroud)
定义类:
case class datum(exchange: String,stock_symbol: String,date: String,stock_price_open: Double,stock_price_high: Double,stock_price_low: Double,stock_price_close: Double,stock_volume: String,stock_price_adj_close: Double)
Run Code Online (Sandbox Code Playgroud)
读入数据:
val data = sc.textFile("input.csv").map(_.split(";")).filter(line => "exchange" != "exchange").map(p => datum(p(0).trim.toString, p(1).trim.toString, p(2).trim.toString, p(3).trim.toDouble, p(4).trim.toDouble, p(5).trim.toDouble, p(6).trim.toDouble, p(7).trim.toString, p(8).trim.toDouble))
Run Code Online (Sandbox Code Playgroud)
转换为表格:
data.registerAsTable("data")
Run Code Online (Sandbox Code Playgroud)
定义查询(列出所有以'IBM'作为股票代码的行):
val IBMs = sqlContext.sql("SELECT * FROM data WHERE stock_symbol …Run Code Online (Sandbox Code Playgroud) 读取 csv 后,我得到了意外的输出:
MessageName;ContactKey;DateTimeSend;MessageContent;MessageOpened;OpenDate;TimeInApp;Platform;PlatformVersion;Status
20200903 - NL SPAARUPDATE Augustus;0031t00000A4w0xAAB;09/03/2020 8:09;Vorige maand heb je dankzij de Lidl-Plus app %%savings%% euro gespaard. Goed bezig! ??????;no;;;iPhone OS;12.4.5;Success
Run Code Online (Sandbox Code Playgroud)
正如您可以想象的那样,输出需要将此信息拆分为列和单元格以创建正常的数据框。
我尝试了以下代码:
df = spark.read.csv('/FileStore/tables/BE_August_monthlysaving.csv', header='true')
display(df)
Run Code Online (Sandbox Code Playgroud)
或者,我尝试, delimiter=';'在标题之前和之后使用,但是当我这样做时,出现以下错误:
csv() 得到意外的关键字参数“分隔符”
知道如何解决这个输出吗?
我正在尝试使用 Pyspark 将 Databricks 中的 INT 列转换为日期列。该列如下所示:
Report_Date
20210102
20210102
20210106
20210103
20210104
Run Code Online (Sandbox Code Playgroud)
我正在尝试使用 CAST 函数
df = df.withColumn("Report_Date", col("Report_Date").cast(DateType()))
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误:
由于数据类型不匹配,无法解析“CAST(`Report_Date` AS DATE)”:无法将 int 转换为 date;
你知道我怎样才能得到预期的输出吗?
我有一个 json 文件,如下所示:
test= {'kpiData': [{'date': '2020-06-03 10:05',
'a': 'MINIMUMINTERVAL',
'b': 0.0,
'c': True},
{'date': '2020-06-03 10:10',
'a': 'MINIMUMINTERVAL',
'b': 0.0,
'c': True},
{'date': '2020-06-03 10:15',
'a': 'MINIMUMINTERVAL',
'b': 0.0,
'c': True},
{'date': '2020-06-03 10:20',
'a': 'MINIMUMINTERVAL',
'b': 0.0,}
]}
Run Code Online (Sandbox Code Playgroud)
我想将其传输到数据框对象,如下所示:
rdd = sc.parallelize([test])
jsonDF = spark.read.json(rdd)
Run Code Online (Sandbox Code Playgroud)
这会导致记录损坏。据我了解,其原因是,True和False不能是 Python 中的条目。所以我需要在之前将这些条目转换spark.read.json()为 TRUE、true 或“True”)。test 是一个字典,rdd 是一个 pyspark.rdd.RDD 对象。对于数据帧对象,转换非常简单,但我没有找到这些对象的解决方案。
我正在尝试计算 Pyspark 数据框中两个 ArrayType 列之间的按元素乘积。我尝试使用下面的方法来实现这一点,但似乎无法得到正确的结果......
from pyspark.sql import functions as F
data.withColumn("array_product", F.expr("transform(CASUAL_TOPS_SIMILARITY_SCORE, (x, PER_UNA_SIMILARITY_SCORE) -> x * PER_UNA_SIMILARITY_SCORE)"))
Run Code Online (Sandbox Code Playgroud)
有人对我如何在这里获得正确的结果有任何提示吗?我在下面的 DataFrame 中附加了一个测试行...我需要将列CASUAL_TOPS_SIMILARITY_SCORE与PER_UNA_SIMILARITY_SCORE
import json
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test").getOrCreate()
js = '{"PER_UNA_SIMILARITY_SCORE":{"category_list":[0.9736891648,0.9242207186,0.9717901106,0.9763716155,0.9440944231,0.9708032326,0.9599383329,0.9705343027,0.804267581,0.9597317177,0.9316773281,0.8076725314,0.9555369889,0.9753550725,0.9811865431,1.0,0.8231541809,0.9738989392,0.9780283991,0.9644088011,0.9798529418,0.9347357116,0.9727502648,0.9778486916,0.8621780792,0.9735844196,0.9582644436,0.9579092722,0.8890027888,0.9394986243,0.9563411605,0.9811867597,0.9738380108,0.9577698381,0.7912932623,0.9778158279]},"CASUAL_TOPS_SIMILARITY_SCORE":{"category_list":[0.7924168764,0.7511316884,0.7925161719,0.8007234107,0.7953468064,0.7882556409,0.7778519374,0.7881058994,1.0,0.7785517364,0.7733458123,0.7426205538,0.7905195275,0.7925983778,0.7983386701,0.804267581,0.6749185095,0.7924821952,0.8016348085,0.7895650508,0.7985721918,0.772656847,0.7897495222,0.7948759958,0.6996340275,0.8024327668,0.7784598142,0.7942396044,0.7159431296,0.7850145414,0.7768001023,0.7983372946,0.7971616495,0.7927845035,0.6462844274,0.799555357]}}'
a_json = json.loads(js)
data = spark.createDataFrame(pd.DataFrame.from_dict(a_json))
Run Code Online (Sandbox Code Playgroud) 我试图在 pyspark 中定义嵌套 .json 模式,但无法使 ddl_schema 字符串正常工作。
通常在 SQL 中这将是 ROW,我已经尝试过下面的 STRUCT 但无法获得正确的数据类型,这是错误......
ParseException:
mismatched input '(' expecting {<EOF>, ',', 'COMMENT', NOT}(line 6, pos 15)
== SQL ==
driverId INT,
driverRef STRING,
number STRING,
code STRING,
name STRUCT(forename STRING, surname STRING),
---------------^^^
dob DATE,
nationality STRING,
url STRING
Run Code Online (Sandbox Code Playgroud)
+--------+----------+------+----+--------------------+----------+-----------+--------------------+
|driverId| driverRef|number|code| name| dob|nationality| url|
+--------+----------+------+----+--------------------+----------+-----------+--------------------+
| 1| hamilton| 44| HAM| {Lewis, Hamilton}|1985-01-07| British|http://en.wikiped...|
Run Code Online (Sandbox Code Playgroud)
+--------+----------+------+----+--------------------+----------+-----------+--------------------+
|driverId| driverRef|number|code| name| dob|nationality| url|
+--------+----------+------+----+--------------------+----------+-----------+--------------------+
| 1| hamilton| 44| HAM| {Lewis, Hamilton}|1985-01-07| …Run Code Online (Sandbox Code Playgroud) 我需要在组内实现类似于 redshift listagg() 的函数(按 x_column 排序),但在 Spark SQL 中很重要,这里的https://spark.apache.org/docs/2.4.0/api/ sql/
有一个类似的问题,但答案不是没有 SQL。
我对 Redshift SQL 的查询是:
select KEY,
listagg(CODE, '-') within group (order by DATE) as CODE
from demo_table
group by KEY
Run Code Online (Sandbox Code Playgroud)
此时,order by 语句并不重要,只需使用 group by 聚合所有列就足够了,我尝试过 concat_ws,但它无法按预期工作
将其放在 pyspark 上对我来说不起作用
| 钥匙 | 代码 | 日期 |
|---|---|---|
| 66 | PL | 2016年11月1日 |
| 66 | PL | 2016年12月1日 |
| 67 | 吉林 | 2016年12月1日 |
| 67 | 吉林 | 2016年10月1日 |
| 67 | PL | 2016年9月1日 |
| 67 | 采购订单 | 2016年8月1日 |
| 67 | 吉林 | 2016年12月1日 |
| 68 | PL | 2016年11月1日 |
| 68 | 乔 | 2016年11月1日 |
所需输出
| 钥匙 | 代码 |
|---|---|
| 68 | JO-PL … |
对于下面的示例数据,想知道如何找出列中最常出现的值colour。的数据类型colour是WrappedArray。数组中可能有 n 个元素。在此示例中,颜色应为黄色,然后是出现两次的蓝色。非常感谢您的帮助。
Name Colour
A ('blue','yellow')
B ('pink', 'yellow')
C ('green', 'black')
D ('yellow','orange','blue')
Run Code Online (Sandbox Code Playgroud) 我有一个具有以下案例类类型的数据集:
case class AddressRawData(
addressId: String,
customerId: String,
address: String
)
Run Code Online (Sandbox Code Playgroud)
我想将其转换为:
case class AddressData(
addressId: String,
customerId: String,
address: String,
number: Option[Int], //i.e. it is optional
road: Option[String],
city: Option[String],
country: Option[String]
)
Run Code Online (Sandbox Code Playgroud)
使用解析器函数:
def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
unparsedAddress.map(address => {
val split = address.address.split(", ")
address.copy(
number = Some(split(0).toInt),
road = Some(split(1)),
city = Some(split(2)),
country = Some(split(3))
)
}
)
}
Run Code Online (Sandbox Code Playgroud)
我是 Scala 和 Spark 的新手。谁能告诉我如何做到这一点?
我正在尝试使用PySpark处理包含带有动态键的结构列的 json 文件。
结构列的架构如下所示:
{
"UUID_KEY": {
"time": STRING
"amount": INTEGER
}
}
Run Code Online (Sandbox Code Playgroud)
数据如下:
| ID | json_列 |
|---|---|
| 1 | “{1:{金额:1,时间:2},2:{金额:10,时间:5}}” |
| 2 | “{3:{金额:1,时间:2},4:{金额:10,时间:5}” |
目前,我将结构列作为字符串,因为通过指定/推断模式加载 JSON 不起作用,因为第一层的键是随机生成的,并且数据太多。第二层始终相同,它包含amount和time。
有没有办法在不知道第一层的键的情况下将此 JSON 字符串平铺到amount和列中?time
apache-spark-sql ×10
apache-spark ×9
pyspark ×7
json ×3
python ×2
scala ×2
sql ×2
arrays ×1
csv ×1
databricks ×1
dataframe ×1
ddl ×1
rdd ×1
scala-spark ×1