我正在运行kafka 2.9.1-0.8.2.1.我在主kafka目录中包含了libs /目录中提供的jar.现在我尝试按照此处给出的内容运行java生成器示例https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example.现在producer.send方法似乎接受了这种论点Seq<KeyedMessage<String, String>>.在该示例中,KeyedMessage的对象不会转换为任何内容.当我尝试做同样的事情时,我得到了不兼容的类型编译器错误.
这是代码
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
import kafka.producer.Producer;
import scala.collection.Seq;
public class KakfaProducer {
public static void main(String [] args) {
Properties prop = new Properties();
prop.put("metadata.broker.list", "localhost:9092");
prop.put("serializer.class","kafka.serializer.StringEncoder");
//prop.put("partitioner.class", "example.producer.SimplePartitioner");
ProducerConfig producerConfig = new ProducerConfig(prop);
Producer<String,String> producer = new <String,String>Producer(producerConfig);
String topic = "test";
KeyedMessage<String,String> message = new <String,String>KeyedMessage(topic, "Hello Test message");
producer.send(message);
producer.close();
}
}
Run Code Online (Sandbox Code Playgroud)
那个评论的代码给了我类def def not found.我试图在网上看很多,但它没有帮助.
libs /目录中有两种jar.一个是kafka-client,另一个是kafka和版本号.我包括错误的罐子吗?我需要使用哪一个?
小智 8
对于第一个问题,不是导入scala API,而是导入Java.所以,而不是使用:
import kafka.producer.Producer;
Run Code Online (Sandbox Code Playgroud)
请用:
import kafka.javaapi.producer.Producer;
Run Code Online (Sandbox Code Playgroud)
SimplePartitioner代码可以在下面找到.将其添加到相应的目录:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(Object key, int numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions;
}
return partition;
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
20587 次 |
| 最近记录: |