我只是使用querycassandra处理器查询cassandra表,但我不理解的是如何将我的Json输出文件作为输入文件传递给ExecutePyspark处理器,稍后我需要将我的Spark输出数据传递给Hive.请帮帮我,谢谢.
我的查询Cassandra属性:
我有一个像JSON
{
"campaign_key": 316,
"client_key": 127,
"cpn_mid_counter": "24",
"cpn_name": "Bopal",
"cpn_status": "Active",
"clt_name": "Bopal Ventures",
"clt_status": "Active"
}
Run Code Online (Sandbox Code Playgroud)
预期产量
第一个JSON:
{
"campaign_key": 316,
"client_key": 127,
"cpn_mid_counter": "24",
"cpn_name": "Bopal",
"cpn_status": "Active"
}
Run Code Online (Sandbox Code Playgroud)
第二个JSON:
{
"clt_name": "Bopal Ventures",
"clt_status": "Active"
}
Run Code Online (Sandbox Code Playgroud)
如何使用NIFI来实现?谢谢。
我有一个来自同一个处理器的 3 个流文件。
FF1 -> {a:1,b:2,c:'name'}
FF2 -> {a:1,b:5,c:'水果'}
FF3 -> {a:2,b:3,c:'abc'}
通过使用 MergeContent Processor,我能够合并所有流文件,但我的要求是在 Key 上合并流文件。
如果我使用键“a”加入,则预期输出:
FF1 -> [{a:1,b:2,c:'name'},{a:1,b:5,c:'fruit'}]
FF2 -> [{a:2,b:3,c:'abc'}]
如何使用PySpark阅读以下JSON结构来激发数据帧?
我的JSON结构
{"results":[{"a":1,"b":2,"c":"name"},{"a":2,"b":5,"c":"foo"}]}
Run Code Online (Sandbox Code Playgroud)
我尝试过:
df = spark.read.json('simple.json');
Run Code Online (Sandbox Code Playgroud)
我希望输出a,b,c作为列和值作为相应的行.
谢谢.