小编KLi*_*iFF的帖子

使用 CassandrSink 的 Flink 作业因写入错误而失败

我有两个从 Kafka 读取的简单 Flink 流作业执行一些转换并将结果放入 Cassandra Sink。他们从不同的 Kafka 主题中读取数据并保存到不同的 Cassandra 表中。

当我单独运行这两项工作中的任何一项时,一切正常。检查点被触发并完成,数据被保存到 Cassandra。

但是,当我运行两个作业(或其中一个运行两次)时,第二个作业在启动时失败,出现以下异常: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing)).

我找不到有关此错误的太多信息,它可能是由以下任一原因引起的:

  • Flink (v 1.10.0-scala_2.12),
  • Flink Cassandra 连接器(flink-connector-cassandra_2.11:jar:1.10.2,也试过 flink-connector-cassandra_2.12:jar:1.10.0),
  • Datastax 底层驱动程序 (v 3.10.2),
  • Cassandra v4.0(与v3.0相同),
  • Netty 传输 (v 4.1.51.Final)。

我还使用可能与第一个冲突的包:

  • mysql-connector-java (v 8.0.19),
  • cassandra-driver-extras (v 3.10.2)

最后,这是我的集群构建器代码:

ClusterBuilder builder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        Cluster cluster = null;
        try {
            cluster = builder
                    .addContactPoint("localhost")
                    .withPort(9042)
                    .withClusterName("Test Cluster")
                    .withoutJMXReporting() …
Run Code Online (Sandbox Code Playgroud)

cassandra datastax-java-driver apache-flink flink-streaming

6
推荐指数
1
解决办法
509
查看次数