Zel*_*iax 12 java stream apache-kafka apache-kafka-streams
我一直在查看我正在研究的Java应用程序的很多Kafka文档.我已经尝试进入Java 8中引入的lambda语法,但是我在这方面有点粗略,并且不会觉得它应该是我现在使用的那样.
我有一个Kafka/Zookeeper服务运行没有任何麻烦,我想要做的是编写一个小的示例程序,基于输入将写出来,但不做一个wordcount,因为已经有这么多的例子.
至于样本数据,我将得到一串以下结构:
This a sample string containing some keywords such as GPS, GEO and maybe a little bit of ACC.
Run Code Online (Sandbox Code Playgroud)
我希望能够提取3个字母的关键字并用a打印出来System.out.println.如何获取包含输入的字符串变量?我知道如何应用正则表达式,甚至只是搜索字符串来获取关键字.
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "0:0:0:0:0:0:0:1:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "0:0:0:0:0:0:0:1:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
final Serde<String> stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
//How do I assign the input from in-stream to the following variable?
String variable = ?
}
Run Code Online (Sandbox Code Playgroud)
我有动物园管理员,卡夫卡,生产者和消费者运行所有连接到同一主题所以我想基本上看到相同的String出现在所有实例(生产者,消费者和流).
Mat*_*Sax 24
如果您使用Kafka Streams,则需要在数据流上应用函数/运算符.在您的情况下,您创建一个KStream对象,因此,您想要应用运算符source.
根据您想要做的事情,有些运算符可以独立地为流中的每个记录应用一个函数(例如map()),或者将函数应用于多个记录的其他运算符(例如aggregateByKey()).您应该查看文档:http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl和示例https://github.com/confluentinc/examples/树/卡夫卡0.10.0.0-CP-3.0.0 /卡夫卡流
因此,您从未使用Kafka Streams创建局部变量(如上图所示),而是将所有内容嵌入到链接在一起的运算符/函数中.
例如,如果要将所有输入记录打印到stdout,则可以这样做
KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
source.foreach(new ForeachAction<String, String>() {
void apply(String key, String value) {
System.out.println(key + ": " + value);
}
});
Run Code Online (Sandbox Code Playgroud)
因此,在您启动应用程序之后streams.start(),它将使用您输入主题的记录消费者,并且对于您的主题的每个记录,apply(...)完成调用,这将在stdout上打印记录.
当然,使用一种更原生的方式将流打印到控制台source.print()(内部基本上foreach()与已经给出的操作符相同ForeachAction).
对于将字符串分配给局部变量的示例,您需要将代码放入apply(...)并执行正则表达式等等,以"提取3个字母的关键字".
然而,表达这一点的最佳方式是通过flatMapValues()和print()(即source.flatMapValues(...).print())的组合.flatMapValues()为每个输入记录调用(在您的情况下,我假设键将是,null因此您可以忽略它).在您的flatMapValue函数中,您应用正则表达式,并且对于每个匹配,您将匹配添加到最终返回的值列表中.
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
ArrayList<String> keywords = new ArrayList<String>();
// apply regex to value and for each match add it to keywords
return keywords;
}
}
Run Code Online (Sandbox Code Playgroud)
输出flatMapValues将KStream再次出现,包含每个找到的关键字的记录(即,输出流是您返回的所有列表中的"联合" ValueMapper#apply()).最后,您只需将结果打印到控制台即可print().(当然,您也可以使用单个foreach而不是flatMapValue+,print但这将不那么模块化.)
| 归档时间: |
|
| 查看次数: |
12415 次 |
| 最近记录: |