标签: apache-kafka-streams

Kafka 在有状态处理中验证消息

我有一个应用程序,多个用户可以发送 REST 操作来修改共享对象的状态。当一个对象被修改时,将会发生多个操作(数据库、审计、日志记录......)。

并非所有操作都有效,例如您无法在删除对象后对其进行修改。

使用 Kafka 我正在考虑以下架构:

  1. 剩余操作在 Kafka 主题中排队。
  2. 对同一对象的操作将针对同一分区。因此该对象的所有操作都将按顺序并由消费者处理
  3. 消费者正在监听分区并使用内存数据库验证操作
  4. 如果操作有效,则发送到“有效操作主题”,否则发送到“无效操作主题”
  5. 其他消费者(db、log、audit)正在监听“有效操作主题”

我不太确定第三点。我不喜欢保留所有对象的状态的想法。(我有数十亿个对象,即使一个对象的大小可以达到 10mb,我需要存储来验证其状态的也只是几 KB...

然而,这是一种常见的模式吗?否则如何验证某些操作的有效性?

另外,您将使用什么作为内存数据库?当然它必须具有高可用性、容错性并支持事务(读和写)。

streaming in-memory-database apache-kafka kafka-consumer-api apache-kafka-streams

2
推荐指数
1
解决办法
1161
查看次数

在 Kafka Stream API 中获取类转换异常

我正在将输入数据生成为 json string 。

对于主题 - myinput

{"time":"2017-11-28T09:42:26.776Z","name":"Lane1","oclass"
     :"myClass","id":112553,"Scope":"198S"}
Run Code Online (Sandbox Code Playgroud)

我的班级看起来像这样:

public class App {
    static public class CountryMessage {

        public String time;
        public String Scope;
        public String id;
        public String oclass;
        public String name; 
    }

    private static final String APP_ID = "countries-streaming-analysis-app";

    public static void main(String[] args) {
        System.out.println("Kafka Streams Demonstration");


        StreamsConfig config = new StreamsConfig(getProperties());
        final Serde < String > stringSerde = Serdes.String();
        final Serde < Long > longSerde = Serdes.Long();

        Map < String, Object > serdeProps = new …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

2
推荐指数
2
解决办法
9543
查看次数

将数据从 kafka 流式传输到 oracle db 的最佳方式是什么

我正在尝试找到一种解决方案,将数据从 Kafka 直接流式传输到 Oracle。最有效的解决方案是什么?

apache-kafka apache-kafka-streams spring-kafka apache-kafka-connect

2
推荐指数
1
解决办法
9170
查看次数

编写 Kafka Streams 来持久保存到数据库中是个好方法吗?

只是想知道在使用 Kafka Streams Builder 编写持久层时可能会遇到什么样的问题,或者有什么使用 Kafka Connect JDBC (Sink) 的建议吗?

apache-kafka apache-kafka-streams apache-kafka-connect

2
推荐指数
1
解决办法
1326
查看次数

当源主题分区计数更新时,如何更新内部变更日志主题分区?

我有一个应用程序,其中使用 Kstream-Kstream 连接和 Ktream-Ktable 连接。我已将输入源主题分区计数从 4 更新为 16,并且应用程序因以下错误而停止。

Could not create internal topics: Existing internal topic application-test-processor-KSTREAM-JOINTHIS-0000000009-store-changelog has invalid partitions. Expected: 16 Actual: 4. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing. Retry #3

当源主题分区计数更新时,如何更新内部变更日志主题分区计数?

注意:我们使用的kafka版本:0.10.2.1

我从此链接查看了应用程序重置工具:https://docs.confluence.io/current/streams/developer-guide/app-reset-tool.html ,但它没有说明如何更新更改日志分区。

提前致谢。

apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams

2
推荐指数
1
解决办法
4091
查看次数

Kafka Streams:ConsumerRebalanceListener 实现

您能否告知以下类需要如何在流配置中注册?

public class MyConsumerRebalanceListener implements ConsumerRebalanceListener {

  static final Logger oLogger = Logger.getLogger(StreamConsumerRebalanceListener.class);

  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    for (TopicPartition p : partitions) {
        oLogger.info(p + " partitions has been assigned to the stream instance");
    }

  }

  @Override
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition p : partitions) {
        oLogger.warn(p + " partitions has been removed from the stream instance");
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
1290
查看次数

间歇性警告 ConsumerCoordinator 我们收到了与我们当前订阅不匹配的分配

为什么我在启动我的 Kafka Streams 应用程序时会间歇性地点击以下 WARN 消息?

它不仅仅是一个警告,因为它淹没了应用程序日志并且 Kafka Streams 应用程序没有启动。

通常,当我重新部署它时,它就会起作用。

[my-app-0 my-app] 2020-03-25 14:00:12.931 WARN 1 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=my-app-b8f0b2a0-271b-4499-85bd-9e22d4a8b4b1-StreamThread-1-consumer, groupId=my-app] We received an assignment [topic-one-0, topic-two-0] that doesn't match our current subscription Subscribe(topic-two); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription

在我重新启动应用程序后,上面的警告消失了,我得到了一个不同的警告,但至少应用程序可以工作!

[my-app-0 my-app] 2020-03-25 14:05:54.300 WARN 1 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=my-app-b0f22dc1-479b-4f7c-a862-b20f70eedc35-StreamThread-1-consumer, groupId=my-app] The following subscribed topics are not assigned to …

apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
1309
查看次数

kafka 流中 statestore 和 changelog 主题的目的?

我有一个 kafka 流应用程序,它在其中使用 stateStore(由 RocksDB 支持)。

流线程所做的就是从 kafka 主题中获取数据并将数据放入状态存储。(还有其他线程从 statestore 读取数据并进行业务逻辑处理)。

我观察到它因为 stateStore 创建了一个新的 kafka 主题“changelog”。

但我没有明白“变更日志”kafka 主题的目的是什么?

  • 为什么需要它(变更日志)?
  • statestore 和“changelog”kafka 主题之间的关系是什么?
  • 谁把数据放到这个话题里?(“变更日志”)

apache-kafka apache-kafka-streams

2
推荐指数
2
解决办法
2083
查看次数

java.lang.ClassNotFoundException: org.apache.kafka.common.metrics.MetricsContext

现在我正在学习卡夫卡。根据视频课程,我正在尝试使用 Kafka Streams 构建 Twitter 推文过滤器:

public class StreamsFilterTweet {

    public static void main(String[] args) {

        // create properties
        Properties properties = new Properties();
        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "demo-kafka-streams");
        properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
        properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());

        // create a topology
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        // input topic
        KStream<String,String> inputTopic = streamsBuilder.stream("twitter_tweets");
        KStream<String,String> filteredStream = inputTopic.filter(
                (k,jsonValue)->extractUserFollowersInTweets(jsonValue)>10000
                        // filter for tweets which has a user of over 10000 followers
        );
        filteredStream.to("important_tweets");

        // build topology
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);

        // start our streams …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
1634
查看次数

外行来说什么是流处理和 Kafka 流?

要了解什么是kafka-streams我应该知道什么是stream-processing。当我开始在网上阅读它们时,我无法掌握整体情况,因为它是一个永无止境的新概念链接树。
谁能stream-processing用一个简单的现实世界的例子来解释什么?
以及如何将其kafka-streams与生产者消费者架构联系起来?

谢谢你。

stream-processing apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
146
查看次数