标签: apache-kafka-streams

在卡夫卡流中左加入时获取类强制转换异常

我正在通过这里提到的示例:https: //github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java

我以json字符串的形式生成输入数据.

对于主题 - PageViews

{"industry":"eng","user":"bob","page":"index.html"}
Run Code Online (Sandbox Code Playgroud)

对于主题 - UserProfiles

{"experience":"some","region":"europe"}
Run Code Online (Sandbox Code Playgroud)

我的班级看起来像:

import com.google.gson.Gson;
import com.sohi.examples.dto.PageViews;
import com.sohi.examples.dto.UserProfiles;
import com.sohi.examples.dto.ViewsByRegion;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.io.IOException;
import java.util.Properties;


public class PageViewRegionLambdaExample {

    public static void main(String[] args) throws IOException {

        final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
        final Properties streamsConfiguration = new Properties();

        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "pageview-region-lambda-example");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());//NOSONAR
        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());//NOSONAR
        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
2128
查看次数

Kafka Streams - 减少大型国营商店的内存占用

我有一个拓扑(见下文),它读取了一个非常大的主题(每天超过十亿条消息).这个Kafka Streams应用程序的内存使用率非常高,我正在寻找一些关于如何减少州商店足迹的建议(详情如下).注意:我并不是想把山羊放到国营商店,我只是觉得可能有办法改善我的拓扑结构 - 见下文.

// stream receives 1 billion+ messages per day
stream
    .flatMap((key, msg) -> rekeyMessages(msg))
    .groupBy((key, value) -> key)
    .reduce(new MyReducer(), MY_REDUCED_STORE)
    .toStream()
    .to(OUTPUT_TOPIC);

// stream the compacted topic as a KTable
KTable<String, String> rekeyedTable = builder.table(OUTPUT_TOPIC, REKEYED_STORE);


// aggregation 1
rekeyedTable.groupBy(...).aggregate(...)

// aggreation 2
rekeyedTable.groupBy(...).aggregate(...)

// etc
Run Code Online (Sandbox Code Playgroud)

更具体地说,我想知道OUTPUT_TOPIC作为KTable 流式传输是否导致状态存储(REKEYED_STORE)大于它需要在本地存储.对于具有大量唯一键的changelog主题,将它们作为a KStream和窗口化聚合进行流式传输会更好吗?或者这不会像我想的那样减少占用空间(例如,只有一部分记录 - 窗口中的那些记录将存在于本地状态存储中).

无论如何,我总是可以启动这个应用程序的更多实例,但我想让每个实例尽可能高效.这是我的问题:

  • 是否有任何配置选项,一般策略等应该考虑具有此级别吞吐量的Kafka Streams应用程序?
  • 单个实例的内存密集程度应该有什么指导原则吗?即使您有一些武断的指南,与他人分享可能会有所帮助.我的一个实例目前正在使用15GB的内存 - 我不知道这是好/坏/无关紧要.

任何帮助将不胜感激!

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
2134
查看次数

Kafka Streams - 解释为什么KTable及其相关商店每30秒才更新一次

我有这个简单的KTable定义生成一个Store:

KTable<String, JsonNode> table = kStreamBuilder.<String, JsonNode>table(ORDERS_TOPIC, ORDERS_STORE);
table.print();
Run Code Online (Sandbox Code Playgroud)

我将消息发布到ORDERS_TOPIC,但商店直到每30秒才真正更新.这是有关提交消息的日志,因为已经过了30000ms时间:

2017-07-25 23:53:15.465 DEBUG 17540 --- [ StreamThread-1] o.a.k.c.consumer.internals.Fetcher       : Sending fetch for partitions [orders-0] to broker EXPRF026.SUMINISTRADOR:9092 (id: 0 rack: null)
2017-07-25 23:53:15.567  INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-07-25 23:53:15.567  INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.processor.internals.StreamTask   : task [0_0] Committing its …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
1971
查看次数

为什么我的Kafka Transformer的StateStore无法访问?

我正在尝试实现一个Transformer类

public class StreamSorterByTimeStampWithDelayTransformer < V > 
    implements Transformer< Long, V, KeyValue< Long, V > >
Run Code Online (Sandbox Code Playgroud)

该类的构造函数为每个实例创建一个StateStore,因此:

this.state_store_name = "state_store_" + this.hashCode();

KeyValueBytesStoreSupplier key_value_store_supplier = Stores.persistentKeyValueStore( state_store_name );
StoreBuilder< KeyValueStore< String, V > > key_value_store_builder = 
    Stores.keyValueStoreBuilder( key_value_store_supplier, Serdes.String(), v_instance.getSerde() );
stream_builder.addStateStore( key_value_store_builder );
Run Code Online (Sandbox Code Playgroud)

Transformer init方法引用StateStore:

public void init(ProcessorContext context) {
    this.context = context;
    this.key_value_store = (KeyValueStore< String, V >) context.getStateStore( state_store_name );
    // schedule a punctuate() method every "poll_ms" (wall clock time)
    this.context.schedule( this.poll_ms, PunctuationType.WALL_CLOCK_TIME, 
            (timestamp) -> pushOutOldEnoughEntries() …
Run Code Online (Sandbox Code Playgroud)

apache-kafka-streams

1
推荐指数
1
解决办法
716
查看次数

Kafka Connect和Streams

所以我最近开始阅读Kafka,我对Kafka Connect和Kafka Streams之间的区别感到有些困惑.根据定义,Kafka Streams可以从Kafka主题收集数据,处理它并将输出推送到另一个Kafka主题.而Kafka Connect将大型数据集移入和移出Kafka.

我的问题是为什么我们需要Kafka Connect几乎可以读取数据,处理数据并将其推送到主题?为什么要增加一个组件 如果有人可以解释差异,那将是很棒的,在此先感谢:)

apache-kafka apache-kafka-streams apache-kafka-connect

1
推荐指数
1
解决办法
403
查看次数

Kafka Streams:实现一个简单的KeyValueStore,我可以在其中放置和获取数据

我有一个Kafka stream应用程序,该应用程序在传入状态下运行,并且需要在写入下一个主题之前存储该状态。只有在本地存储中更新状态后,才应执行写操作。

这样的事情。

stream.map(this::getAndUpdateState)
          .map(this::processStateAndEvent)
          .to("topicname");
Run Code Online (Sandbox Code Playgroud)

这样getAndUpdateState()我就能做到

state = store.get(key); // or new if null
state = updateState(state, event);  // update changes to state
store.put(key, state);  // write back the state
return state;
Run Code Online (Sandbox Code Playgroud)

如何在kafka商店上实现该简单的get()和put()操作?我已经尝试使用KeyValueStore但是它遇到了问题,因为我必须将其添加为源处理器和接收器处理器。

另外,也可以使用KTable或其他一些概念来获取和放入kafka。

java apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
2054
查看次数

由于状态无效,无法为Kafka流打开商店

我正在尝试使用Kafka Streams,并且创建了以下拓扑:

    KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
            historyEventSerde));

    eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
            .groupByKey()
            .reduce((e1, e2) -> e2, Materialized.as(streamByKeyStoreName));
Run Code Online (Sandbox Code Playgroud)

稍后,我像这样启动流:

private void startKafkaStreams(KafkaStreams streams) {
    CompletableFuture<KafkaStreams.State> stateFuture = new CompletableFuture<>();
    streams.setStateListener((newState, oldState) -> {
        if(stateFuture.isDone()) {
            return;
        }

        if(newState == KafkaStreams.State.RUNNING || newState == KafkaStreams.State.ERROR) {
            stateFuture.complete(newState);
        }
    });

    streams.start();
    try {
        KafkaStreams.State finalState = stateFuture.get();
        if(finalState != KafkaStreams.State.RUNNING) {
            // ...
        }
    } catch (InterruptedException ex) {
        // ...
    } catch(ExecutionException ex) {
        // ...
    }
} …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
1049
查看次数

Alpakka卡夫卡vs卡夫卡流

我们正在构建一个高吞吐量,低延迟的流处理应用程序。我们将Apache Kafka用作消息传递平台和数据库。

似乎Kafka Streams和Alpakka Kafka框架有很多共同之处,但是Kafka Streams似乎比Kafka更“原生”,而Alpakka允许我们使用Akka框架的功能。

这两个框架之间的主要区别是什么?

scala akka apache-kafka apache-kafka-streams alpakka

1
推荐指数
1
解决办法
689
查看次数

KSQL查询以简单聚合返回意外值

我从针对由Kafka主题定义的KTable的KSQL查询中获得意外结果。KTABLE是“交易”,并且由压缩主题“ localhost.dbo.TradeHistory”支持。它应该包含由TradeId键控的股票交易的最新信息。主题的键是TradeId。每笔交易都有一个AccountId,我正在尝试构造一个查询以获取按帐户分组的交易金额的总和。

交易表的定义

ksql> create table Trades(TradeId int, AccountId int, Spn int, Amount double) with (KAFKA_TOPIC = 'localhost.dbo.TradeHistory', VALUE_FORMAT = 'JSON', KEY = 'TradeId');

...

ksql> describe extended Trades;

Name                 : TRADES
Type                 : TABLE
Key field            : TRADEID
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : localhost.dbo.TradeHistory (partitions: 1, replication: 1)

Field     | Type
---------------------------------------
ROWTIME   | BIGINT           (system)
ROWKEY    | VARCHAR(STRING)  (system)
TRADEID   | INTEGER
ACCOUNTID | …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams ksql

1
推荐指数
1
解决办法
81
查看次数

KTable中的数据存储多长时间?

作为参考,轮廓的流更新存储在KTable对象。

  1. 此数据将在KTable对象中存储多长时间?
  2. 假设我们运行应用程序的多个实例。而且,实例崩溃。KTable数据属于那个实例怎么样?它会被另一个实例“恢复”吗?

我正在考虑存储很少更新的数据更新。因此,如果一个实例崩溃并且另一个实例将再次从头开始构建那些数据,则它们可能再也无法获取这些数据了。因为它们再也不会流式传输,或者说的很简单,所以很少。

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
37
查看次数