我正在使用Kafka 0.8测试版,我只是试图发送不同的对象,使用我自己的编码器序列化它们,并将它们发送到现有的代理配置.现在我试图让DefaultEncoder工作.
我有代理和一切设置并为StringEncoder工作,但我无法获得任何其他数据类型,包括纯字节[],由代理发送和接收.
我的制作人代码是:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
public class ProducerTest {
public static void main(String[] args) {
long events = 5;
Random rnd = new Random();
rnd.setSeed(new Date().getTime());
Properties props = new Properties();
props.setProperty("metadata.broker.list", "localhost:9093,localhost:9094");
props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
props.setProperty("partitioner.class", "example.producer.SimplePartitioner");
props.setProperty("request.required.acks", "1");
props.setProperty("producer.type", "async");
props.setProperty("batch.num.messages", "4");
ProducerConfig config = new ProducerConfig(props);
Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
byte[] a = "Hello".getBytes(); …Run Code Online (Sandbox Code Playgroud) apache-kafka ×1