Azz*_*zzy 3 scala apache-kafka apache-spark apache-spark-sql
我使用的是Spark 2.1.0和Kafka 0.9.0.
我试图将批量火花作业的输出推送到kafka.这项工作应该每小时运行一次,但不是作为流媒体运行.
在网上寻找答案时,我只能找到kafka与Spark流的集成,而不是与批处理作业的集成.
有谁知道这样的事情是否可行?
谢谢
更新:
正如user8371915所提到的,我试图遵循将批量查询的输出写入Kafka所做的工作.
我用了火花壳:
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
Run Code Online (Sandbox Code Playgroud)
这是我尝试的简单代码:
val df = Seq(("Rey", "23"), ("John", "44")).toDF("key", "value")
val newdf = df.select(to_json(struct(df.columns.map(column):_*)).alias("value"))
newdf.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "alerts").save()
Run Code Online (Sandbox Code Playgroud)
但我得到错误:
java.lang.RuntimeException: org.apache.spark.sql.kafka010.KafkaSourceProvider does not allow create table as select.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:497)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
... 50 elided
Run Code Online (Sandbox Code Playgroud)
知道这与此有关吗?
谢谢
tl; dr你使用过时的Spark版本.写入在2.2及更高版本中启用.
开箱即用,您可以使用Kafka SQL连接器(与结构化流媒体使用相同).包括
spark-sql-kafka 在您的依赖项中.DataFrame为至少包含value类型StringType或列的列BinaryType.将数据写入Kafka:
df
.write
.format("kafka")
.option("kafka.bootstrap.servers", server)
.save()
Run Code Online (Sandbox Code Playgroud)关注结构化流文档以获取详细信息(从将批处理查询的输出写入Kafka开始).
如果您有一个数据框并且想要将其写入 kafka 主题,则需要首先将列转换为包含 json 格式数据的“值”列。在 scala 中是
import org.apache.spark.sql.functions._
val kafkaServer: String = "localhost:9092"
val topicSampleName: String = "kafkatopic"
df.select(to_json(struct("*")).as("value"))
.selectExpr("CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServer)
.option("topic", topicSampleName)
.save()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2692 次 |
| 最近记录: |