KLi*_*iFF 6 cassandra datastax-java-driver apache-flink flink-streaming
我有两个从 Kafka 读取的简单 Flink 流作业执行一些转换并将结果放入 Cassandra Sink。他们从不同的 Kafka 主题中读取数据并保存到不同的 Cassandra 表中。
当我单独运行这两项工作中的任何一项时,一切正常。检查点被触发并完成,数据被保存到 Cassandra。
但是,当我运行两个作业(或其中一个运行两次)时,第二个作业在启动时失败,出现以下异常:
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing)).
我找不到有关此错误的太多信息,它可能是由以下任一原因引起的:
我还使用可能与第一个冲突的包:
最后,这是我的集群构建器代码:
ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
Cluster cluster = null;
try {
cluster = builder
.addContactPoint("localhost")
.withPort(9042)
.withClusterName("Test Cluster")
.withoutJMXReporting()
.withProtocolVersion(ProtocolVersion.V4)
.withoutMetrics()
.build();
// register codecs from datastax extras.
cluster.getConfiguration().getCodecRegistry()
.register(LocalTimeCodec.instance);
} catch (ConfigurationException e) {
e.printStackTrace();
} catch (NoHostAvailableException nhae) {
nhae.printStackTrace();
}
return cluster;
}
};
Run Code Online (Sandbox Code Playgroud)
我尝试了不同的 PoolingOptions 和 SocketOptions 设置,但没有成功。
卡桑德拉水槽:
CassandraSink.addSink(dataRows)
.setQuery("insert into table_name_(16 columns names) " +
"values (16 placeholders);")
.enableWriteAheadLog()
.setClusterBuilder(builder)
.setFailureHandler(new CassandraFailureHandler() {
@Override
public void onFailure(Throwable throwable) {
LOG.error("A {} occurred.", "Cassandra Failure", throwable);
}
})
.build()
.setParallelism(1)
.name("Cassandra Sink For Unique Count every N minutes.");
Run Code Online (Sandbox Code Playgroud)
来自 flink 作业管理器的完整跟踪日志:
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.createSession(CassandraSinkBase.java:143)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:87)
at org.apache.flink.streaming.connectors.cassandra.AbstractCassandraTupleSink.open(AbstractCassandraTupleSink.java:49)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.base/java.lang.Thread.run(Thread.java:834)
Run Code Online (Sandbox Code Playgroud)
任何帮助表示赞赏。
编辑:
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.createSession(CassandraSinkBase.java:143)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:87)
at org.apache.flink.streaming.connectors.cassandra.AbstractCassandraTupleSink.open(AbstractCassandraTupleSink.java:49)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.base/java.lang.Thread.run(Thread.java:834)
Run Code Online (Sandbox Code Playgroud)
编辑:我设法缩小了问题的范围。当我将依赖项标记flink-connector-cassandra为提供时,错误得到修复,我只需从本地 maven 存储库 (~/.m2/repository/org/apache/flink/flink-connector-cassandra_2.11/1.10.2/flink) 复制 jar 文件-connector-cassandra_2.11-1.10.2.jar) 到 Flink lib 文件夹。我的问题解决了,但根本原因仍然是个谜。
为了修复该错误,我将依赖项标记flink-connector-cassandra为提供的,然后只需从本地 Maven 存储库 (~/.m2/repository/org/apache/flink/flink-connector-cassandra_2.11/1.10.2/flink- Connector-cassandra_2.11-1.10.2.jar) 到 Flink lib 文件夹并重新启动 Flink,这是我的新 pom.xml 文件:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Run Code Online (Sandbox Code Playgroud)
我是怎么找到这个的?我正准备尝试使用更新的驱动程序版本从源代码编译连接器。首先,我尝试使用未更改的源重现该错误。所以我在不做任何更改的情况下编译了它,将 jar 放入 Flink lib 文件夹中,万岁,它可以工作了!然后我怀疑maven的jar有不同的东西。我将它复制到 lib 文件夹中,令我惊讶的是它也有效。
我的问题已经解决,但根本原因仍然是个谜。
我的最后一次尝试是检查是否有任何包与 Cassandra 连接器冲突,因此我运行了aboutdependency:tree -Dverbose的一个冲突:org.apache.flink:flink-metrics-dropwizardmetrics-core
[INFO] +- org.apache.flink:flink-connector-cassandra_2.12:jar:1.10.0:provided
[INFO] | +- (io.dropwizard.metrics:metrics-core:jar:3.1.2:provided - omitted for conflict with 3.1.5)
[INFO] | \- (org.apache.flink:force-shading:jar:1.10.0:provided - omitted for duplicate)
Run Code Online (Sandbox Code Playgroud)
我从项目中删除了此依赖项,但如果连接器未标记为已提供并且也未放入 lib 文件夹中,则错误仍然存在。