Soh*_*ani 2 apache-flink flink-streaming
使用Apache Flink版本1.3.2和Cassandra 3.11,我编写了一个简单的代码,使用Apache Flink Cassandra连接器将数据写入Cassandra.以下是代码:
final Collection<String> collection = new ArrayList<>(50);
for (int i = 1; i <= 50; ++i) {
collection.add("element " + i);
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<UUID, String>> dataStream = env
.fromCollection(collection)
.map(new MapFunction<String, Tuple2<UUID, String>>() {
final String mapped = " mapped ";
String[] splitted;
@Override
public Tuple2<UUID, String> map(String s) throws Exception {
splitted = s.split("\\s+");
return new Tuple2(
UUID.randomUUID(),
splitted[0] + mapped + splitted[1]
);
}
});
dataStream.print();
CassandraSink.addSink(dataStream)
.setQuery("INSERT INTO test.phases (id, text) values (?, ?);")
.setHost("127.0.0.1")
.build();
env.execute();
Run Code Online (Sandbox Code Playgroud)
尝试使用Apache Flink 1.4.2(1.4.x)运行相同的代码,我收到错误:
Error:(36, 22) java: cannot access org.apache.flink.streaming.api.scala.DataStream
class file for org.apache.flink.streaming.api.scala.DataStream not found
Run Code Online (Sandbox Code Playgroud)
在线上
CassandraSink.addSink(dataStream)
.setQuery("INSERT INTO test.phases (id, text) values (?, ?);")
.setHost("127.0.0.1")
.build();
Run Code Online (Sandbox Code Playgroud)
我认为我们在Apache Flink 1.4.2中有一些依赖项更改,它会导致问题.
我使用代码中导入的以下依赖项:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
Run Code Online (Sandbox Code Playgroud)
如何解决Apache Flink 1.4.2版中的错误?
更新:
在Flink 1.3.2中,该类org.apache.flink.streaming.api.scala.DataStream<T>在Java文档中,但在1.4.2版本中没有这样的类.看到这里
我在Fss 1.4.2文档中尝试了Cassandra连接器的代码示例但是我得到了相同的错误,但该示例适用于Flink 1.3.2依赖项!
除了所有其他依赖项,请确保您具有Flink Scala依赖项:
Maven的
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.4.2</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
摇篮
dependencies {
compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: '1.4.2'
..
}
Run Code Online (Sandbox Code Playgroud)
我设法让您的示例使用以下依赖项:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
Run Code Online (Sandbox Code Playgroud)
Maven的
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.11</artifactId>
<version>1.4.2</version>
</dependency>
</dependencies>
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1447 次 |
| 最近记录: |