Shi*_*lpi 2 java apache-kafka apache-kafka-streams
我正在将输入数据生成为 json string 。
对于主题 - myinput
{"time":"2017-11-28T09:42:26.776Z","name":"Lane1","oclass"
:"myClass","id":112553,"Scope":"198S"}
Run Code Online (Sandbox Code Playgroud)
我的班级看起来像这样:
public class App {
static public class CountryMessage {
public String time;
public String Scope;
public String id;
public String oclass;
public String name;
}
private static final String APP_ID = "countries-streaming-analysis-app";
public static void main(String[] args) {
System.out.println("Kafka Streams Demonstration");
StreamsConfig config = new StreamsConfig(getProperties());
final Serde < String > stringSerde = Serdes.String();
final Serde < Long > longSerde = Serdes.Long();
Map < String, Object > serdeProps = new HashMap < > ();
final Serializer < CountryMessage > countryMessageSerializer = new JsonPOJOSerializer < > ();
serdeProps.put("JsonPOJOClass", CountryMessage.class);
countryMessageSerializer.configure(serdeProps, false);
final Deserializer < CountryMessage > countryMessageDeserializer = new JsonPOJODeserializer < > ();
serdeProps.put("JsonPOJOClass", CountryMessage.class);
countryMessageDeserializer.configure(serdeProps, false);
final Serde < CountryMessage > countryMessageSerde = Serdes.serdeFrom(countryMessageSerializer,countryMessageDeserializer);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, CountryMessage> countriesStream = kStreamBuilder.stream(stringSerde, countryMessageSerde, "vanitopic");
KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey();
KTable<Windowed<String>, Long> aggregatedStream = countries.count(TimeWindows.of(60 * 1000L), "UserCountStore");
System.out.println("Starting Kafka Streams Countries Example");
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
kafkaStreams.start();
System.out.println("Now started CountriesStreams Example");
}
private static Properties getProperties() {
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.106.9.235:9092,10.106.9.235:9093,10.106.9.235:9094");
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "10.106.9.235:2181");
//settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return settings;
}
}
Run Code Online (Sandbox Code Playgroud)
我收到以下类强制转换异常:
线程“countries-streaming-analysis-app-f7f95119-4401-4a6e-8060-5a138ffaddb2-StreamThread-1”org.apache.kafka.streams.errors.StreamsException 中出现异常:在进程中捕获异常。taskId = 0_0,处理器= KSTREAM-SOURCE-0000000000,主题= vanitopic,分区= 0,偏移= 2036在org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)在org.apache .kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679) 在 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557) 在 org.apache.kafka.streams .processor.internals.StreamThread.run(StreamThread.java:527) 引起:org.apache.kafka.streams.errors.StreamsException:序列化程序(键:org.apache.kafka.common.serialization.ByteArraySerializer /值:org .apache.kafka.common.serialization.ByteArraySerializer)与实际键或值类型(键类型:java.lang.String/值类型:com.cisco.streams.countries.App$CountryMessage)不兼容。更改 StreamConfig 中的默认 Serdes 或通过方法参数提供正确的 Serdes。在 org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) 在 org.apache .kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43) 在 org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) 在 org.apache. kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) 在 org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) 在 org.apache.kafka.streams。 org.apache.kafka.streams.kstream.internals.KStreamMap $KStreamMapProcessor.process(KStreamMap.java:42) 在 org.apache.kafka.streams.processor 处的processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)。 org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) org.apache.kafka.streams.processor.internals.ProcessorNode 处的internals.ProcessorNode$1.run(ProcessorNode.java:47) .process(ProcessorNode.java:133) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) 在 org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode .java:80) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189) ... 3 更多 引起原因:java.lang.ClassCastException:java.lang.String 无法转换到 [B at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:88) at org. apache.kafka.streams.processor.internals。
我需要帮助来了解如何以及在何处应用我创建的自定义 Serdes
在你的代码中,
KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey();
Run Code Online (Sandbox Code Playgroud)
需要groupByKey()设置两个序列化器,因为这将触发数据重新分区。或者您将默认 Serded 设置为 forString和CountryMessagetypes。
正如我的评论中提到的,每个不使用默认 Serdes 的操作员都StreamsConfig需要设置正确的 Serdes。
因此,该count()操作还需要指定相应的StringSerdes Long:
countries.count(TimeWindows.of(60 * 1000L), "UserCountStore");
Run Code Online (Sandbox Code Playgroud)
所有可能需要Serdes适当重载的运算符。只需检查您正在使用的所有运算符的所有过载即可。
查看文档了解更多详细信息:https://docs.confluence.io/current/streams/developer-guide/dsl-api.html
将序列化器添加到 groupByKey
KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey(Grouped.with(Serdes.String(), new ObjectMapperSerde<>(CountryMessage.class)));
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
9543 次 |
| 最近记录: |