小编Shu*_*Shu的帖子

EvaluateJsonPath 无法返回标量

我正在尝试从 JSON 中提取一个值到flowfile-attribute。当我运行 EvaluateJsonPath 处理器时,我收到一个错误说明

"Unable to get a scalar value for expression $..fields.storyBoard.stringValue.

输入 JSON 如下所示:

{
  "name" : "projects/fakedims-0000/databases/(default)/documents/device/0000",
  "fields" : {
    "reportKey" : {
      "stringValue" : "abc123"
    },
    "dateOccured" : {
      "timestampValue" : "2018-10-14T04:00:00Z"
    },
    "storyBoard" : {
      "stringValue" : "https://path/to/media"
    },
    "new" : {
      "integerValue" : "25"
    },
    "name" : {
      "stringValue" : "device one"
    },
    "location" : {
      "geoPointValue" : {
        "latitude" : -78.413751,
        "longitude" : 38.156487
      }
    }
  },
  "createTime" : …
Run Code Online (Sandbox Code Playgroud)

json jsonpath apache-nifi

5
推荐指数
1
解决办法
2478
查看次数

Pyspark:在数据框中用 null 替换所有出现的值

我有一个类似于下面的数据框。我最初用 -1 填充所有空值以在 Pyspark 中进行连接。

df = pd.DataFrame({'Number': ['1', '2', '-1', '-1'],
                   'Letter': ['A', '-1', 'B', 'A'],
                   'Value': [30, 30, 30, -1]})


pyspark_df = spark.createDataFrame(df)

+------+------+-----+
|Number|Letter|Value|
+------+------+-----+
|     1|     A|   30|
|     2|    -1|   30|
|    -1|     B|   30|
|    -1|     A|   -1|
+------+------+-----+
Run Code Online (Sandbox Code Playgroud)

处理完数据集后,我需要将所有 -1 替换回空值。

+------+------+-----+
|Number|Letter|Value|
+------+------+-----+
|     1|     A|   30|
|     2|  null|   30|
|  null|     B|   30|
|  null|     A| null|
+------+------+-----+
Run Code Online (Sandbox Code Playgroud)

什么是最简单的方法来做到这一点?

apache-spark apache-spark-sql pyspark pyspark-dataframes

5
推荐指数
2
解决办法
335
查看次数

PySpark DataFrame 过滤器列包含多个值

只是想知道是否有任何有效的方法来过滤包含值列表的列,例如:

假设我要过滤一列包含牛肉,Beef:

我可以:

beefDF=df.filter(df.ingredients.contains('Beef')|df.ingredients.contains('beef'))
Run Code Online (Sandbox Code Playgroud)

我不想这样做,而是想创建一个列表:

beef_product=['Beef','beef']
Run Code Online (Sandbox Code Playgroud)

并做:

beefDF=df.filter(df.ingredients.contains(beef_product))
Run Code Online (Sandbox Code Playgroud)

我不需要维护代码,但只需要在 Beef_product 列表中添加新的牛肉(例如牛、肋眼)即可获得过滤数据框。

显然 contains 函数不采用列表类型,实现这一点的好方法是什么?

dataframe apache-spark pyspark

4
推荐指数
1
解决办法
1万
查看次数

Pyspark 收集列表

我正在 pyspark 数据框中的一列上进行分组,并在另一列上执行收集列表以获取 column_1 的所有可用值。如下。

Column_1 Column_2
A        Name1
A        Name2
A        Name3
B        Name1
B        Name2
C        Name1
D        Name1
D        Name1
D        Name1
D        Name1
Run Code Online (Sandbox Code Playgroud)

我得到的输出是列_2 的收集列表,其中列_1 分组。

Column_1 Column_2
A        [Name1,Name2,Name3]  
B        [Name1,Name2]
C        [Name1]
D        [Name1,Name1,Name1,Name1]
Run Code Online (Sandbox Code Playgroud)

现在,当收集列表中的所有值都相同时,我只想仅显示一次而不是四次。以下是预期输出。

预期输出:

Column_1 Column_2
A        [Name1,Name2,Name3]  
B        [Name1,Name2]
C        [Name1]
D        [Name1]
Run Code Online (Sandbox Code Playgroud)

有没有办法在 pyspark 中做到这一点?

dataframe apache-spark pyspark

3
推荐指数
1
解决办法
4794
查看次数

如何从 pyspark sql 上的大表中选择除其中 2 之外的所有列?

在连接两个表时,我想从一个大表中选择除其中 2 列之外的所有列,该大表在 databricks 上的 pyspark sql 上有许多列。

我的 pyspark sql:

 %sql
 set hive.support.quoted.identifiers=none;
 select a.*, '?!(b.year|b.month)$).+'
 from MY_TABLE_A as a
 left join 
      MY_TABLE_B as b
 on a.year = b.year and a.month = b.month 
Run Code Online (Sandbox Code Playgroud)

我按照 配置单元:选择所有列排除两个 配置单元如何选择除一列之外的所有列?

但是,它对我不起作用。所有列都在结果中。我想删除重复的列(结果中的年份和月份)。

谢谢

python sql hive apache-spark pyspark

3
推荐指数
1
解决办法
1万
查看次数

如何在apache nifi中拆分输入json数组

我如何在 Apache nifi 中实现以下拆分

我的输入 json 是

[
{
  "quality": true,
  "tagname": "P1001",
  "timestamp": 1543295658092,
  "value": 121
},
{
  "quality": true,
  "tagname": "P1002",
  "timestamp": 1543295658092,
  "value": 23
}
]
Run Code Online (Sandbox Code Playgroud)

我的输出应该是 2 个单独的 json

1.

{
  "quality": true,
  "tagname": "P1001",
  "timestamp": 1543295658092,
  "value": 121
}
Run Code Online (Sandbox Code Playgroud)

2.

{
  "quality": true,
  "tagname": "P1002",
  "timestamp": 1543295658092,
  "value": 23
}
Run Code Online (Sandbox Code Playgroud)

apache-nifi

2
推荐指数
1
解决办法
608
查看次数

为什么 SparkSession.sql("set hive.support.quoted.identifiers=None") 不起作用?

我想在 SparkSession.sql 中使用正则表达式,但无论我使用:

SparkSession.builder.enableHiveSupport().config("hive.support.quoted.identifiers", None)

或者

SparkSession.sql("set hive.support.quoted.identifiers=None")

请告诉我该怎么做。

代码:

ss = (pyspark.sql.SparkSession
      .builder
      .enableHiveSupport()          
      .config("hive.support.quoted.identifiers", None)
      .getOrCreate())                                         
#ss.sql("set hive.support.quoted.identifiers=None")
ss.sql("SELECT `(col)?+.+` FROM table")
Run Code Online (Sandbox Code Playgroud)

程序结果:

pyspark.sql.utils.AnalysisException: "cannot resolve '`(col)?+.+`' given input columns: ... ...
Run Code Online (Sandbox Code Playgroud)

hive apache-spark pyspark

2
推荐指数
1
解决办法
2469
查看次数

在 Pyspark 中将月份名称转换为数字

+--------+-----+---+----+----+----+----+-----+-----------+-----------+
|Currency|Month|Day|Year|Open|High| Low|Close|     Volume| Market Cap|
+--------+-----+---+----+----+----+----+-----+-----------+-----------+
|   tezos|  Dec| 04|2019|1.29|1.32|1.25| 1.25| 46,048,752|824,588,509|
|   tezos|  Dec| 03|2019|1.24|1.32|1.21| 1.29| 41,462,224|853,213,342|
|   tezos|  Dec| 02|2019|1.25|1.26|1.20| 1.24| 27,574,097|817,872,179|
|   tezos|  Dec| 01|2019|1.33|1.34|1.25| 1.25| 24,127,567|828,296,390|
|   tezos|  Nov| 30|2019|1.31|1.37|1.31| 1.33| 28,706,667|879,181,680|
|   tezos|  Nov| 29|2019|1.28|1.34|1.28| 1.31| 32,270,224|867,085,098|
|   tezos|  Nov| 28|2019|1.26|1.35|1.22| 1.28| 44,240,281|845,073,679|
|   tezos|  Nov| 27|2019|1.24|1.27|1.16| 1.26| 47,723,271|829,672,736|
|   tezos|  Nov| 15|2019|1.22|1.26|1.15| 1.17| 32,203,363|773,992,543|
+--------+-----+---+----+----+----+----+-----+-----------+-----------+
Run Code Online (Sandbox Code Playgroud)

这是我的 pyspark 数据报,如何将月份名称转换为数字???谢谢

apache-spark pyspark

2
推荐指数
1
解决办法
5768
查看次数

在 pyspark 数据框中添加具有另一列最大值的新列

在 pyspark df 上需要一些帮助。我正在尝试将具有另一列最大值的新列附加到现有数据帧,但出现以下错误。这就是我正在做的事情。

df1 = df.withColumn('WEEK_START_DATE', df.agg(f.max('DATE')))



error:
AttributeError: 'DataFrame' object has no attribute '_get_object_id'
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

2
推荐指数
1
解决办法
5388
查看次数

是否可以使用相同的代码读取 csv 或 parquet 文件

有谁知道是否可以使用相同的代码将 csv 或 parquet 文件读取到 spark 中。

我的用例是在生产中,我将使用大型镶木地板文件,但对于单元测试,我想使用 CSV。我正在使用类似于以下代码的内容:

spark.read().schema(schema).load(path);
Run Code Online (Sandbox Code Playgroud)

这在 CSV 情况下失败,但有以下例外:

file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [78, 9, 78, 10]
Run Code Online (Sandbox Code Playgroud)

我怀疑 spark 默认为镶木地板,这不起作用,但我想先检查一下。

apache-spark apache-spark-sql pyspark

2
推荐指数
1
解决办法
176
查看次数