avi*_*ner 5 scala cassandra apache-flink
我必须使用 Flink 作为流引擎处理来自 Kafka 的数据流。为了对数据进行分析,我需要查询 Cassandra 中的一些表。做这个的最好方式是什么?我一直在寻找 Scala 中此类情况的示例。但我找不到任何信息。如何使用 Scala 作为编程语言在 Flink 中读取 Cassandra 的数据? Read & write data into cassandra using apache flink Java API在同一行上还有另一个问题。它有答案中提到的多种方法。我想知道针对我的情况最好的方法是什么。此外,大多数可用示例都是用 Java 编写的。我正在寻找 Scala 示例。
我目前在 flink 1.3 中使用 asyncIO 从 cassandra 读取数据。这是有关它的文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html(其中有DatabaseClient,您将使用com.datastax.drive.core.Cluster)
如果您需要一个更深入的示例来专门使用它从 cassandra 读取数据,请告诉我,但不幸的是我只能提供一个 java 中的示例。
编辑1
下面是我使用 flink 的异步 I/O 从 Cassandra 读取数据的代码示例。我仍在努力识别和修复一个问题,由于某种原因(无需深入研究),对于单个查询返回的大量数据,异步数据流的超时被触发,即使它看起来由 Cassandra 正常返回并且在超时时间之前。但假设这只是我正在做的其他事情的一个错误,而不是因为这段代码,这对你来说应该可以正常工作(并且对我来说也已经工作了几个月):
public class GenericCassandraReader extends RichAsyncFunction<CustomInputObject, ResultSet> {
private final Properties props;
private Session client;
public GenericCassandraReader(Properties props) {
super();
this.props = props;
}
@Override
public void open(Configuration parameters) throws Exception {
client = Cluster.builder()
.addContactPoint(props.cassandraUrl)
.withPort(props.cassandraPort)
.build()
.connect(props.cassandraKeyspace);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(final CustomInputObject customInputObject, final AsyncCollector<ResultSet> asyncCollector) throws Exception {
String queryString = "select * from table where fieldToFilterBy='" + customInputObject.id() + "';";
ListenableFuture<ResultSet> resultSetFuture = client.executeAsync(queryString);
Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
public void onSuccess(ResultSet resultSet) {
asyncCollector.collect(Collections.singleton(resultSet));
}
public void onFailure(Throwable t) {
asyncCollector.collect(t);
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
再次抱歉耽搁了。我希望能够解决该错误,这样我就可以确定,但我认为此时只有一些参考总比没有好。
编辑2
因此我们最终确定问题不在于代码,而在于网络吞吐量。很多字节试图通过一个不够大的管道来处理它,东西开始备份,一些开始慢慢流入,但是(感谢 datastax cassandra 驱动程序的 QueryLogger 我们可以看到这一点)接收结果所花费的时间每个查询开始攀升至 4 秒,然后是 6 秒,然后是 8 秒,依此类推。
TL;DR,代码很好,只是要注意,如果您遇到 Flink 的 asyncWaitOperator 的 timeoutExceptions,则可能是网络问题。
编辑2.5
还意识到,由于网络延迟问题,我们最终转而使用 RichMapFunction 来保存我们从 cassandra 读取的数据的状态,这可能是有益的。因此,该作业只需跟踪通过它的所有记录,而不必每次有新记录通过时都从表中读取以获取其中的所有记录。
| 归档时间: |
|
| 查看次数: |
4186 次 |
| 最近记录: |