mir*_*iro 3 java json apache-kafka apache-flink
我正在尝试使用以下代码从Kafka主题获取JSON:
public class FlinkMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream messageStream = env.addSource(
new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
, new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));
messageStream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
});
env.execute();
}
}
Run Code Online (Sandbox Code Playgroud)
}
问题是:
1)此程序未运行到期
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
The problem is at line: `messageStream.map(....`
Run Code Online (Sandbox Code Playgroud)
2)也许上述问题与DataStream
没有类型的事实有关.但如果我试图做:
DataStream<String> messageStream = env.addSource(...
代码将无法编译到期 cannot resolve constructor FlinkKafkaConsumer09 ...
pom.xml(重要部分):
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
Run Code Online (Sandbox Code Playgroud)
我一直在寻找Flink中使用JSON DeserializationSchema但没有成功的代码.我刚刚pom.xml
在这个链接找到了unittest
有谁知道怎么做正确的方法?
谢谢
我遵循了Vishnu viswanath的回答,但是JSONKeyValueDeserializationSchema在JSON解析器步骤中引发了异常,即使对于简单的JSON也是如此{"name":"John Doe"}
.
抛出的代码是:
DataStream<ObjectNode> messageStream = env.addSource(
new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
, new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(ObjectNode node) throws Exception {
return "Kafka and Flink says: " + node.get(0);
}
}).print();
Run Code Online (Sandbox Code Playgroud)
输出:
09/05/2016 11:16:02 Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:822)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:790)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2215)
at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:52)
at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:38)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我成功使用了另一个反序列化模式JSONDeserializationSchema
DataStream<ObjectNode> messageStream = env.addSource(
new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
, new JSONDeserializationSchema(), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(ObjectNode value) throws Exception {
return "Kafka and Flink says: " + value.get("key").asText();
}
}).print();
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
7295 次 |
最近记录: |