小编Kar*_*ava的帖子

如何使用QueryCassandra和ExecutePySpark Nifi处理器将我的cassandra数据传输到pyspark?

我只是使用querycassandra处理器查询cassandra表,但我不理解的是如何将我的Json输出文件作为输入文件传递给ExecutePyspark处理器,稍后我需要将我的Spark输出数据传递给Hive.请帮帮我,谢谢.

我的查询Cassandra属性:

在此输入图像描述

Pyspark物业: 在此输入图像描述

cassandra apache-spark pyspark apache-nifi kylo

3
推荐指数
1
解决办法
241
查看次数

使用Nifi将JSON分为两个单独的JSON对象

我有一个像JSON

{
    "campaign_key": 316,
    "client_key": 127,
    "cpn_mid_counter": "24",
    "cpn_name": "Bopal",
    "cpn_status": "Active",
    "clt_name": "Bopal Ventures",
    "clt_status": "Active"
}
Run Code Online (Sandbox Code Playgroud)

预期产量

第一个JSON:

{
    "campaign_key": 316,
    "client_key": 127,
    "cpn_mid_counter": "24",
    "cpn_name": "Bopal",
    "cpn_status": "Active"
}
Run Code Online (Sandbox Code Playgroud)

第二个JSON:

{
    "clt_name": "Bopal Ventures",
    "clt_status": "Active"
}
Run Code Online (Sandbox Code Playgroud)

如何使用NIFI来实现?谢谢。

json apache-nifi kylo

2
推荐指数
2
解决办法
1164
查看次数

使用nifi根据条件合并流文件?

我有一个来自同一个处理器的 3 个流文件。

FF1 -> {a:1,b:2,c:'name'}

FF2 -> {a:1,b:5,c:'水果'}

FF3 -> {a:2,b:3,c:'abc'}

通过使用 MergeContent Processor,我能够合并所有流文件,但我的要求是在 Key 上合并流文件。

如果我使用键“a”加入,则预期输出:

FF1 -> [{a:1,b:2,c:'name'},{a:1,b:5,c:'fruit'}]

FF2 -> [{a:2,b:3,c:'abc'}]

apache-nifi kylo

2
推荐指数
1
解决办法
1846
查看次数

使用PySpark将JSON文件读作Pyspark Dataframe?

如何使用PySpark阅读以下JSON结构来激发数据帧?

我的JSON结构

{"results":[{"a":1,"b":2,"c":"name"},{"a":2,"b":5,"c":"foo"}]}
Run Code Online (Sandbox Code Playgroud)

我尝试过:

df = spark.read.json('simple.json');
Run Code Online (Sandbox Code Playgroud)

我希望输出a,b,c作为列和值作为相应的行.

谢谢.

python apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
7484
查看次数