使用 CassandrSink 的 Flink 作业因写入错误而失败

KLi*_*iFF 6 cassandra datastax-java-driver apache-flink flink-streaming

我有两个从 Kafka 读取的简单 Flink 流作业执行一些转换并将结果放入 Cassandra Sink。他们从不同的 Kafka 主题中读取数据并保存到不同的 Cassandra 表中。

当我单独运行这两项工作中的任何一项时,一切正常。检查点被触发并完成,数据被保存到 Cassandra。

但是,当我运行两个作业(或其中一个运行两次)时,第二个作业在启动时失败,出现以下异常: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing)).

我找不到有关此错误的太多信息,它可能是由以下任一原因引起的:

  • Flink (v 1.10.0-scala_2.12),
  • Flink Cassandra 连接器(flink-connector-cassandra_2.11:jar:1.10.2,也试过 flink-connector-cassandra_2.12:jar:1.10.0),
  • Datastax 底层驱动程序 (v 3.10.2),
  • Cassandra v4.0(与v3.0相同),
  • Netty 传输 (v 4.1.51.Final)。

我还使用可能与第一个冲突的包:

  • mysql-connector-java (v 8.0.19),
  • cassandra-driver-extras (v 3.10.2)

最后,这是我的集群构建器代码:

ClusterBuilder builder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        Cluster cluster = null;
        try {
            cluster = builder
                    .addContactPoint("localhost")
                    .withPort(9042)
                    .withClusterName("Test Cluster")
                    .withoutJMXReporting()
                    .withProtocolVersion(ProtocolVersion.V4)
                    .withoutMetrics()
                    .build();

            // register codecs from datastax extras.
            cluster.getConfiguration().getCodecRegistry()
                    .register(LocalTimeCodec.instance);
        } catch (ConfigurationException e) {
            e.printStackTrace();
        } catch (NoHostAvailableException nhae) {
            nhae.printStackTrace();
        }

        return cluster;
    }
};
Run Code Online (Sandbox Code Playgroud)

我尝试了不同的 PoolingOptions 和 SocketOptions 设置,但没有成功。

卡桑德拉水槽:

CassandraSink.addSink(dataRows)
.setQuery("insert into table_name_(16 columns names) " +
        "values (16 placeholders);")
.enableWriteAheadLog()
.setClusterBuilder(builder)
.setFailureHandler(new CassandraFailureHandler() {
    @Override
    public void onFailure(Throwable throwable) {
        LOG.error("A {} occurred.", "Cassandra Failure", throwable);
    }
})
.build()
.setParallelism(1)
.name("Cassandra Sink For Unique Count every N minutes.");
Run Code Online (Sandbox Code Playgroud)

来自 flink 作业管理器的完整跟踪日志:

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
    at com.datastax.driver.core.Cluster.init(Cluster.java:162)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
    at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.createSession(CassandraSinkBase.java:143)
    at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:87)
    at org.apache.flink.streaming.connectors.cassandra.AbstractCassandraTupleSink.open(AbstractCassandraTupleSink.java:49)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.base/java.lang.Thread.run(Thread.java:834)
Run Code Online (Sandbox Code Playgroud)

任何帮助表示赞赏。

编辑:

  • 我只是尝试使用两个 Cassandra 独立实例(不同的机器和不同的集群)。然后我将一个作业指向一个实例,将另一个作业指向另一个实例。什么都没有改变,我仍然遇到同样的错误。
  • 试图减少依赖,这里是新的 pom 文件:
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
    at com.datastax.driver.core.Cluster.init(Cluster.java:162)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
    at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.createSession(CassandraSinkBase.java:143)
    at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:87)
    at org.apache.flink.streaming.connectors.cassandra.AbstractCassandraTupleSink.open(AbstractCassandraTupleSink.java:49)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.base/java.lang.Thread.run(Thread.java:834)
Run Code Online (Sandbox Code Playgroud)

编辑:我设法缩小了问题的范围。当我将依赖项标记flink-connector-cassandra为提供时,错误得到修复,我只需从本地 maven 存储库 (~/.m2/repository/org/apache/flink/flink-connector-cassandra_2.11/1.10.2/flink) 复制 jar 文件-connector-cassandra_2.11-1.10.2.jar) 到 Flink lib 文件夹。我的问题解决了,但根本原因仍然是个谜。

KLi*_*iFF 2

为了修复该错误,我将依赖项标记flink-connector-cassandra为提供的,然后只需从本地 Maven 存储库 (~/.m2/repository/org/apache/flink/flink-connector-cassandra_2.11/1.10.2/flink- Connector-cassandra_2.11-1.10.2.jar) 到 Flink lib 文件夹并重新启动 Flink,这是我的新 pom.xml 文件:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
Run Code Online (Sandbox Code Playgroud)

我是怎么找到这个的?我正准备尝试使用更新的驱动程序版本从源代码编译连接器。首先,我尝试使用未更改的源重现该错误。所以我在不做任何更改的情况下编译了它,将 jar 放入 Flink lib 文件夹中,万岁,它可以工作了!然后我怀疑maven的jar有不同的东西。我将它复制到 lib 文件夹中,令我惊讶的是它也有效。

我的问题已经解决,但根本原因仍然是个谜。

我的最后一次尝试是检查是否有任何包与 Cassandra 连接器冲突,因此我运行了aboutdependency:tree -Dverbose的一个冲突:org.apache.flink:flink-metrics-dropwizardmetrics-core

[INFO] +- org.apache.flink:flink-connector-cassandra_2.12:jar:1.10.0:provided
[INFO] |  +- (io.dropwizard.metrics:metrics-core:jar:3.1.2:provided - omitted for conflict with 3.1.5)
[INFO] |  \- (org.apache.flink:force-shading:jar:1.10.0:provided - omitted for duplicate)
Run Code Online (Sandbox Code Playgroud)

我从项目中删除了此依赖项,但如果连接器未标记为已提供并且也未放入 lib 文件夹中,则错误仍然存​​在。