Kar*_*ava 3 cassandra apache-spark pyspark apache-nifi kylo
我只是使用querycassandra处理器查询cassandra表,但我不理解的是如何将我的Json输出文件作为输入文件传递给ExecutePyspark处理器,稍后我需要将我的Spark输出数据传递给Hive.请帮帮我,谢谢.
我的查询Cassandra属性:
考虑这个使用4个处理器的流程如下:
QueryCassandra - > UpdateAttribute - > PutFile - > ExecutePySpark
步骤1:QueryCassandra
处理器:在Cassandra上执行CQL并将结果输出到流文件中.
步骤2:UpdateAttribute
处理器:filename
为磁盘上的临时文件分配包含查询结果的临时文件名的值.使用NiFi表达式语言生成文件名,以便每次运行时都不同.创建属性result_directory
并为NiFi具有写入权限的磁盘上的文件夹分配值.
filename
值: cassandra_result_${now():toNumber()}
属性: result_directory
/tmp
步骤3:PutFile
处理器:Directory
使用${result_directory}
步骤2中填充的值配置属性.
步骤4:ExecutePySpark
处理器:通过PySpark App Args
处理器属性将文件名及其位置作为参数传递给PySpark应用程序.然后,应用程序可以使用代码从磁盘上的文件中读取数据,处理它并写入Hive.
PySpark App Args
${result_directory}/${filename}
此外,您可以在步骤2(UpdateAttribute)中配置更多属性,然后可以将其作为参数传递到步骤4(ExecutePySpark),并由PySpark应用程序以书面形式考虑到Hive(例如,Hive数据库和表名).
归档时间: |
|
查看次数: |
241 次 |
最近记录: |