我目前正在编写一个Samza脚本,它将从Kafka主题中获取数据并将数据输出到另一个Kafka主题.我写了一个非常基本的StreamTask但是在执行时我遇到了一个错误.
错误如下:
Exception in thread "main" org.apache.samza.SamzaException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 193 ms.
at org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.send(CoordinatorStreamSystemProducer.java:112)
at org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.writeConfig(CoordinatorStreamSystemProducer.java:129)
at org.apache.samza.job.JobRunner.run(JobRunner.scala:79)
at org.apache.samza.job.JobRunner$.main(JobRunner.scala:48)
at org.apache.samza.job.JobRunner.main(JobRunner.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 193 ms
Run Code Online (Sandbox Code Playgroud)
我不完全确定如何配置或让脚本编写所需的Kafka元数据.下面是我的StreamTask代码和属性文件.在属性文件中,我添加了元数据部分,以查看这是否有助于此后的过程,但无济于事.这是正确的方向还是我完全错过了什么?
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
/*
* Take all messages received and send them to
* a Kafka topic called "words"
*/
public class TestStreamTask implements StreamTask{
private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka" , "words"); …
Run Code Online (Sandbox Code Playgroud)