我正在尝试从 JSON 中提取一个值到flowfile-attribute。当我运行 EvaluateJsonPath 处理器时,我收到一个错误说明
"Unable to get a scalar value for expression $..fields.storyBoard.stringValue
.
输入 JSON 如下所示:
{
"name" : "projects/fakedims-0000/databases/(default)/documents/device/0000",
"fields" : {
"reportKey" : {
"stringValue" : "abc123"
},
"dateOccured" : {
"timestampValue" : "2018-10-14T04:00:00Z"
},
"storyBoard" : {
"stringValue" : "https://path/to/media"
},
"new" : {
"integerValue" : "25"
},
"name" : {
"stringValue" : "device one"
},
"location" : {
"geoPointValue" : {
"latitude" : -78.413751,
"longitude" : 38.156487
}
}
},
"createTime" : …
Run Code Online (Sandbox Code Playgroud) 我有一个类似于下面的数据框。我最初用 -1 填充所有空值以在 Pyspark 中进行连接。
df = pd.DataFrame({'Number': ['1', '2', '-1', '-1'],
'Letter': ['A', '-1', 'B', 'A'],
'Value': [30, 30, 30, -1]})
pyspark_df = spark.createDataFrame(df)
+------+------+-----+
|Number|Letter|Value|
+------+------+-----+
| 1| A| 30|
| 2| -1| 30|
| -1| B| 30|
| -1| A| -1|
+------+------+-----+
Run Code Online (Sandbox Code Playgroud)
处理完数据集后,我需要将所有 -1 替换回空值。
+------+------+-----+
|Number|Letter|Value|
+------+------+-----+
| 1| A| 30|
| 2| null| 30|
| null| B| 30|
| null| A| null|
+------+------+-----+
Run Code Online (Sandbox Code Playgroud)
什么是最简单的方法来做到这一点?
只是想知道是否有任何有效的方法来过滤包含值列表的列,例如:
假设我要过滤一列包含牛肉,Beef:
我可以:
beefDF=df.filter(df.ingredients.contains('Beef')|df.ingredients.contains('beef'))
Run Code Online (Sandbox Code Playgroud)
我不想这样做,而是想创建一个列表:
beef_product=['Beef','beef']
Run Code Online (Sandbox Code Playgroud)
并做:
beefDF=df.filter(df.ingredients.contains(beef_product))
Run Code Online (Sandbox Code Playgroud)
我不需要维护代码,但只需要在 Beef_product 列表中添加新的牛肉(例如牛、肋眼)即可获得过滤数据框。
显然 contains 函数不采用列表类型,实现这一点的好方法是什么?
我正在 pyspark 数据框中的一列上进行分组,并在另一列上执行收集列表以获取 column_1 的所有可用值。如下。
Column_1 Column_2
A Name1
A Name2
A Name3
B Name1
B Name2
C Name1
D Name1
D Name1
D Name1
D Name1
Run Code Online (Sandbox Code Playgroud)
我得到的输出是列_2 的收集列表,其中列_1 分组。
Column_1 Column_2
A [Name1,Name2,Name3]
B [Name1,Name2]
C [Name1]
D [Name1,Name1,Name1,Name1]
Run Code Online (Sandbox Code Playgroud)
现在,当收集列表中的所有值都相同时,我只想仅显示一次而不是四次。以下是预期输出。
预期输出:
Column_1 Column_2
A [Name1,Name2,Name3]
B [Name1,Name2]
C [Name1]
D [Name1]
Run Code Online (Sandbox Code Playgroud)
有没有办法在 pyspark 中做到这一点?
在连接两个表时,我想从一个大表中选择除其中 2 列之外的所有列,该大表在 databricks 上的 pyspark sql 上有许多列。
我的 pyspark sql:
%sql
set hive.support.quoted.identifiers=none;
select a.*, '?!(b.year|b.month)$).+'
from MY_TABLE_A as a
left join
MY_TABLE_B as b
on a.year = b.year and a.month = b.month
Run Code Online (Sandbox Code Playgroud)
我按照 配置单元:选择所有列排除两个 配置单元如何选择除一列之外的所有列?
但是,它对我不起作用。所有列都在结果中。我想删除重复的列(结果中的年份和月份)。
谢谢
我如何在 Apache nifi 中实现以下拆分
我的输入 json 是
[
{
"quality": true,
"tagname": "P1001",
"timestamp": 1543295658092,
"value": 121
},
{
"quality": true,
"tagname": "P1002",
"timestamp": 1543295658092,
"value": 23
}
]
Run Code Online (Sandbox Code Playgroud)
我的输出应该是 2 个单独的 json
1.
{
"quality": true,
"tagname": "P1001",
"timestamp": 1543295658092,
"value": 121
}
Run Code Online (Sandbox Code Playgroud)
2.
{
"quality": true,
"tagname": "P1002",
"timestamp": 1543295658092,
"value": 23
}
Run Code Online (Sandbox Code Playgroud) 我想在 SparkSession.sql 中使用正则表达式,但无论我使用:
SparkSession.builder.enableHiveSupport().config("hive.support.quoted.identifiers", None)
或者
SparkSession.sql("set hive.support.quoted.identifiers=None")
。
请告诉我该怎么做。
代码:
ss = (pyspark.sql.SparkSession
.builder
.enableHiveSupport()
.config("hive.support.quoted.identifiers", None)
.getOrCreate())
#ss.sql("set hive.support.quoted.identifiers=None")
ss.sql("SELECT `(col)?+.+` FROM table")
Run Code Online (Sandbox Code Playgroud)
程序结果:
pyspark.sql.utils.AnalysisException: "cannot resolve '`(col)?+.+`' given input columns: ... ...
Run Code Online (Sandbox Code Playgroud) +--------+-----+---+----+----+----+----+-----+-----------+-----------+
|Currency|Month|Day|Year|Open|High| Low|Close| Volume| Market Cap|
+--------+-----+---+----+----+----+----+-----+-----------+-----------+
| tezos| Dec| 04|2019|1.29|1.32|1.25| 1.25| 46,048,752|824,588,509|
| tezos| Dec| 03|2019|1.24|1.32|1.21| 1.29| 41,462,224|853,213,342|
| tezos| Dec| 02|2019|1.25|1.26|1.20| 1.24| 27,574,097|817,872,179|
| tezos| Dec| 01|2019|1.33|1.34|1.25| 1.25| 24,127,567|828,296,390|
| tezos| Nov| 30|2019|1.31|1.37|1.31| 1.33| 28,706,667|879,181,680|
| tezos| Nov| 29|2019|1.28|1.34|1.28| 1.31| 32,270,224|867,085,098|
| tezos| Nov| 28|2019|1.26|1.35|1.22| 1.28| 44,240,281|845,073,679|
| tezos| Nov| 27|2019|1.24|1.27|1.16| 1.26| 47,723,271|829,672,736|
| tezos| Nov| 15|2019|1.22|1.26|1.15| 1.17| 32,203,363|773,992,543|
+--------+-----+---+----+----+----+----+-----+-----------+-----------+
Run Code Online (Sandbox Code Playgroud)
这是我的 pyspark 数据报,如何将月份名称转换为数字???谢谢
在 pyspark df 上需要一些帮助。我正在尝试将具有另一列最大值的新列附加到现有数据帧,但出现以下错误。这就是我正在做的事情。
df1 = df.withColumn('WEEK_START_DATE', df.agg(f.max('DATE')))
error:
AttributeError: 'DataFrame' object has no attribute '_get_object_id'
Run Code Online (Sandbox Code Playgroud) 有谁知道是否可以使用相同的代码将 csv 或 parquet 文件读取到 spark 中。
我的用例是在生产中,我将使用大型镶木地板文件,但对于单元测试,我想使用 CSV。我正在使用类似于以下代码的内容:
spark.read().schema(schema).load(path);
Run Code Online (Sandbox Code Playgroud)
这在 CSV 情况下失败,但有以下例外:
file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [78, 9, 78, 10]
Run Code Online (Sandbox Code Playgroud)
我怀疑 spark 默认为镶木地板,这不起作用,但我想先检查一下。
apache-spark ×8
pyspark ×8
apache-nifi ×2
dataframe ×2
hive ×2
json ×1
jsonpath ×1
python ×1
sql ×1