Mic*_*hal 9 hadoop mongodb apache-kafka apache-flink
我想设置Flink,以便将数据流从Apache Kafka转换并重定向到MongoDB.出于测试目的,我建立在flink-streaming-connectors.kafka示例(https://github.com/apache/flink)之上.
Faf正在为Kafka流提供正确的红色,我可以映射它们等等,但是当我想将每个收到的和转换后的消息保存到MongoDB时会出现问题.我发现的关于MongoDB集成的唯一例子是来自github的flink-mongodb-test.不幸的是,它使用静态数据源(数据库),而不是数据流.
我相信MongoDB应该有一些DataStream.addSink实现,但显然没有.
实现它的最佳方法是什么?我是否需要编写自定义接收器功能或者我可能缺少某些东西?也许它应该以不同的方式完成?
我没有任何解决方案,所以任何建议将不胜感激.
下面是一个例子,我正在获得什么作为输入以及我需要存储为输出.
Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String)
Apache Kafka Broker --------------> Flink: DataStream<String>
Flink: DataStream.map({
return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD")
})
.rebalance()
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection
Run Code Online (Sandbox Code Playgroud)
正如您在本示例中所看到的,我主要使用Flink进行Kafka的消息流缓冲和一些基本的解析.
目前 Flink 中没有可用的 Streaming MongoDB 接收器。
但是,将数据写入 MongoDB 有两种方法:
使用DataStream.write()Flink的调用。它允许您将任何 OutputFormat(来自 Batch API)与流式传输一起使用。使用Flink的HadoopOutputFormatWrapper,可以使用官方的MongoDB Hadoop连接器
自己实现 Sink。使用 Streaming API 实现接收器非常容易,而且我确信 MongoDB 有一个很好的 Java 客户端库。
这两种方法都不提供任何复杂的处理保证。但是,当您将 Flink 与 Kafka 结合使用(并启用检查点)时,您将拥有至少一次语义:在错误情况下,数据会再次流式传输到 MongoDB 接收器。如果您正在进行幂等更新,重做这些更新不会导致任何不一致。
如果您确实需要 MongoDB 的一次性语义,您可能应该在 Flink 中提交 JIRA并与社区讨论如何实现它。
| 归档时间: |
|
| 查看次数: |
2434 次 |
| 最近记录: |