Jay*_*Tee 5 java apache-camel apache-kafka
我最近注意到Camel现在有自己的Kafka组件所以我决定给它一个旋转.
我决定尝试一个很好的简单文件 - > kafka主题如下......
<route>
<from uri="file:///tmp/input" />
<setHeader headerName="kafka.PARTITION_KEY">
<constant>Test</constant>
</setHeader>
<to uri="kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1" />
</route>
Run Code Online (Sandbox Code Playgroud)
这看起来很简单,然而,在运行这个我得到...
java.lang.ClassCastException: java.lang.String cannot be cast to [B
at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34)
at org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:78)
Run Code Online (Sandbox Code Playgroud)
在检查Camel代码时,它执行以下操作......
String msg = exchange.getIn().getBody(String.class);
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg);
producer.send(data);
Run Code Online (Sandbox Code Playgroud)
显然,这是一个序列化问题,我只是不确定是否有解决方法,或者这本身就是现有实现的错误?(或者希望只是我的误解)
有什么建议?谢谢,J
Jay*_*Tee 11
啊,没关系,我们去...希望这有助于其他人,你必须在选项中设置序列化器.
<route>
<from uri="file:///tmp/input" />
<setHeader headerName="kafka.PARTITION_KEY">
<constant>Test</constant>
</setHeader>
<to uri="kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1&serializerClass=kafka.serializer.StringEncoder" />
</route>
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7684 次 |
| 最近记录: |