我正在通过这里提到的示例:https: //github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java
我以json字符串的形式生成输入数据.
对于主题 - PageViews
{"industry":"eng","user":"bob","page":"index.html"}
Run Code Online (Sandbox Code Playgroud)
对于主题 - UserProfiles
{"experience":"some","region":"europe"}
Run Code Online (Sandbox Code Playgroud)
我的班级看起来像:
import com.google.gson.Gson;
import com.sohi.examples.dto.PageViews;
import com.sohi.examples.dto.UserProfiles;
import com.sohi.examples.dto.ViewsByRegion;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.io.IOException;
import java.util.Properties;
public class PageViewRegionLambdaExample {
public static void main(String[] args) throws IOException {
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "pageview-region-lambda-example");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());//NOSONAR
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());//NOSONAR
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, …Run Code Online (Sandbox Code Playgroud) 我有一个拓扑(见下文),它读取了一个非常大的主题(每天超过十亿条消息).这个Kafka Streams应用程序的内存使用率非常高,我正在寻找一些关于如何减少州商店足迹的建议(详情如下).注意:我并不是想把山羊放到国营商店,我只是觉得可能有办法改善我的拓扑结构 - 见下文.
// stream receives 1 billion+ messages per day
stream
.flatMap((key, msg) -> rekeyMessages(msg))
.groupBy((key, value) -> key)
.reduce(new MyReducer(), MY_REDUCED_STORE)
.toStream()
.to(OUTPUT_TOPIC);
// stream the compacted topic as a KTable
KTable<String, String> rekeyedTable = builder.table(OUTPUT_TOPIC, REKEYED_STORE);
// aggregation 1
rekeyedTable.groupBy(...).aggregate(...)
// aggreation 2
rekeyedTable.groupBy(...).aggregate(...)
// etc
Run Code Online (Sandbox Code Playgroud)
更具体地说,我想知道OUTPUT_TOPIC作为KTable 流式传输是否导致状态存储(REKEYED_STORE)大于它需要在本地存储.对于具有大量唯一键的changelog主题,将它们作为a KStream和窗口化聚合进行流式传输会更好吗?或者这不会像我想的那样减少占用空间(例如,只有一部分记录 - 窗口中的那些记录将存在于本地状态存储中).
无论如何,我总是可以启动这个应用程序的更多实例,但我想让每个实例尽可能高效.这是我的问题:
任何帮助将不胜感激!
我有这个简单的KTable定义生成一个Store:
KTable<String, JsonNode> table = kStreamBuilder.<String, JsonNode>table(ORDERS_TOPIC, ORDERS_STORE);
table.print();
Run Code Online (Sandbox Code Playgroud)
我将消息发布到ORDERS_TOPIC,但商店直到每30秒才真正更新.这是有关提交消息的日志,因为已经过了30000ms时间:
2017-07-25 23:53:15.465 DEBUG 17540 --- [ StreamThread-1] o.a.k.c.consumer.internals.Fetcher : Sending fetch for partitions [orders-0] to broker EXPRF026.SUMINISTRADOR:9092 (id: 0 rack: null)
2017-07-25 23:53:15.567 INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-07-25 23:53:15.567 INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.processor.internals.StreamTask : task [0_0] Committing its …Run Code Online (Sandbox Code Playgroud) 我正在尝试实现一个Transformer类
public class StreamSorterByTimeStampWithDelayTransformer < V >
implements Transformer< Long, V, KeyValue< Long, V > >
Run Code Online (Sandbox Code Playgroud)
该类的构造函数为每个实例创建一个StateStore,因此:
this.state_store_name = "state_store_" + this.hashCode();
KeyValueBytesStoreSupplier key_value_store_supplier = Stores.persistentKeyValueStore( state_store_name );
StoreBuilder< KeyValueStore< String, V > > key_value_store_builder =
Stores.keyValueStoreBuilder( key_value_store_supplier, Serdes.String(), v_instance.getSerde() );
stream_builder.addStateStore( key_value_store_builder );
Run Code Online (Sandbox Code Playgroud)
Transformer init方法引用StateStore:
public void init(ProcessorContext context) {
this.context = context;
this.key_value_store = (KeyValueStore< String, V >) context.getStateStore( state_store_name );
// schedule a punctuate() method every "poll_ms" (wall clock time)
this.context.schedule( this.poll_ms, PunctuationType.WALL_CLOCK_TIME,
(timestamp) -> pushOutOldEnoughEntries() …Run Code Online (Sandbox Code Playgroud) 所以我最近开始阅读Kafka,我对Kafka Connect和Kafka Streams之间的区别感到有些困惑.根据定义,Kafka Streams可以从Kafka主题收集数据,处理它并将输出推送到另一个Kafka主题.而Kafka Connect将大型数据集移入和移出Kafka.
我的问题是为什么我们需要Kafka Connect几乎可以读取数据,处理数据并将其推送到主题?为什么要增加一个组件 如果有人可以解释差异,那将是很棒的,在此先感谢:)
我有一个Kafka stream应用程序,该应用程序在传入状态下运行,并且需要在写入下一个主题之前存储该状态。只有在本地存储中更新状态后,才应执行写操作。
这样的事情。
stream.map(this::getAndUpdateState)
.map(this::processStateAndEvent)
.to("topicname");
Run Code Online (Sandbox Code Playgroud)
这样getAndUpdateState()我就能做到
state = store.get(key); // or new if null
state = updateState(state, event); // update changes to state
store.put(key, state); // write back the state
return state;
Run Code Online (Sandbox Code Playgroud)
如何在kafka商店上实现该简单的get()和put()操作?我已经尝试使用KeyValueStore但是它遇到了问题,因为我必须将其添加为源处理器和接收器处理器。
另外,也可以使用KTable或其他一些概念来获取和放入kafka。
我正在尝试使用Kafka Streams,并且创建了以下拓扑:
KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
historyEventSerde));
eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
.groupByKey()
.reduce((e1, e2) -> e2, Materialized.as(streamByKeyStoreName));
Run Code Online (Sandbox Code Playgroud)
稍后,我像这样启动流:
private void startKafkaStreams(KafkaStreams streams) {
CompletableFuture<KafkaStreams.State> stateFuture = new CompletableFuture<>();
streams.setStateListener((newState, oldState) -> {
if(stateFuture.isDone()) {
return;
}
if(newState == KafkaStreams.State.RUNNING || newState == KafkaStreams.State.ERROR) {
stateFuture.complete(newState);
}
});
streams.start();
try {
KafkaStreams.State finalState = stateFuture.get();
if(finalState != KafkaStreams.State.RUNNING) {
// ...
}
} catch (InterruptedException ex) {
// ...
} catch(ExecutionException ex) {
// ...
}
} …Run Code Online (Sandbox Code Playgroud) 我们正在构建一个高吞吐量,低延迟的流处理应用程序。我们将Apache Kafka用作消息传递平台和数据库。
似乎Kafka Streams和Alpakka Kafka框架有很多共同之处,但是Kafka Streams似乎比Kafka更“原生”,而Alpakka允许我们使用Akka框架的功能。
这两个框架之间的主要区别是什么?
我从针对由Kafka主题定义的KTable的KSQL查询中获得意外结果。KTABLE是“交易”,并且由压缩主题“ localhost.dbo.TradeHistory”支持。它应该包含由TradeId键控的股票交易的最新信息。主题的键是TradeId。每笔交易都有一个AccountId,我正在尝试构造一个查询以获取按帐户分组的交易金额的总和。
ksql> create table Trades(TradeId int, AccountId int, Spn int, Amount double) with (KAFKA_TOPIC = 'localhost.dbo.TradeHistory', VALUE_FORMAT = 'JSON', KEY = 'TradeId');
...
ksql> describe extended Trades;
Name : TRADES
Type : TABLE
Key field : TRADEID
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : localhost.dbo.TradeHistory (partitions: 1, replication: 1)
Field | Type
---------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
TRADEID | INTEGER
ACCOUNTID | …Run Code Online (Sandbox Code Playgroud) 此作为参考,轮廓的流更新存储在KTable对象。
我正在考虑存储很少更新的数据更新。因此,如果一个实例崩溃并且另一个实例将再次从头开始构建那些数据,则它们可能再也无法获取这些数据了。因为它们再也不会流式传输,或者说的很简单,所以很少。