如何使用QueryCassandra和ExecutePySpark Nifi处理器将我的cassandra数据传输到pyspark?

Kar*_*ava 3 cassandra apache-spark pyspark apache-nifi kylo

我只是使用querycassandra处理器查询cassandra表,但我不理解的是如何将我的Json输出文件作为输入文件传递给ExecutePyspark处理器,稍后我需要将我的Spark输出数据传递给Hive.请帮帮我,谢谢.

我的查询Cassandra属性:

在此输入图像描述

Pyspark物业: 在此输入图像描述

Jag*_*rma 5

考虑这个使用4个处理器的流程如下:

QueryCassandra - > UpdateAttribute - > PutFile - > ExecutePySpark

步骤1:QueryCassandra处理器:在Cassandra上执行CQL并将结果输出到流文件中.

步骤2:UpdateAttribute处理器:filename为磁盘上的临时文件分配包含查询结果的临时文件名的值.使用NiFi表达式语言生成文件名,以便每次运行时都不同.创建属性result_directory并为NiFi具有写入权限的磁盘上的文件夹分配值.

  • 属性: filename
  • 值: cassandra_result_${now():toNumber()}

  • 属性: result_directory

  • 值: /tmp

在此输入图像描述

步骤3:PutFile处理器:Directory使用${result_directory}步骤2中填充的值配置属性.

在此输入图像描述

步骤4:ExecutePySpark处理器:通过PySpark App Args处理器属性将文件名及其位置作为参数传递给PySpark应用程序.然后,应用程序可以使用代码从磁盘上的文件中读取数据,处理它并写入Hive.

  • 属性: PySpark App Args
  • 值: ${result_directory}/${filename}

在此输入图像描述

此外,您可以在步骤2(UpdateAttribute)中配置更多属性,然后可以将其作为参数传递到步骤4(ExecutePySpark),并由PySpark应用程序以书面形式考虑到Hive(例如,Hive数据库和表名).