标签: kafka-consumer-api

Kafka,commitSync 成功

我想弄清楚是否需要使用 KafkaConsumer.commitSync(Map offsets) 打开手动提交

当我使用记录时,我需要确保它们被我的服务处理,我不能失去它们,但是如果我的服务抛出一堆错误或崩溃,我不想移动偏移量,直到我知道它们已被处理。

commitSync 看起来像我需要调用的方法吗?

如果我没有使用 commitSync,而是使用自动偏移提交,当服务在尝试处理一些 Kafka 事件时崩溃时,我是否有可能丢失事件。换句话说,当使用自动偏移提交时,偏移量会在什么时候提交?

apache-kafka kafka-consumer-api

1
推荐指数
1
解决办法
1221
查看次数

Google Cloud (GCP) Pub/Sub 是否支持与 Kafka 中的 ConsumerGroups 类似的功能

尝试在 Google Cloud (GCP) Pub/Sub 与 Manager Kafka Service 之间做出选择。

在最新的更新中,Pub/Sub 添加了对重放之前处理过的消息的支持,这是一个值得欢迎的变化。

我在他们的文档中找不到的一个功能是,我们是否可以拥有类似于 Kafka 消费者组的功能,即拥有一组订阅者,每个订阅者处理来自同一主题的数据,并且能够从头开始重新处理数据订阅者(消费群体)而其他人则不受其影响。例如:

假设您有一个主题 StockTicks

你有两个消费者群体

CG1:有两个消费者
CG2:有另外两个消费者

在 Kafka 中,我可以独立读取这些组之间的消息,但我可以使用 Pub/Sub 做同样的事情吗?

而且Kafka允许你从头开始重放消息,我可以对Pub/Sub做同样的事情吗?如果我不能重放CG创建之前发布的消息,我可以,但是我可以重放CG创建之后提交的消息吗? CG/订阅者已创建?

apache-kafka google-cloud-platform google-cloud-pubsub kafka-consumer-api

1
推荐指数
1
解决办法
3893
查看次数

使用 kafka-python 检索主题中的消息

我已经使用kafka-python库编写了一个 python 脚本,它将消息写入和读取到kafka. 我写消息没有任何问题;kafka我可以使用控制台工具检索它们。但我无法使用我的 python 脚本读取它们。我的消费者有一个 for ,它冻结在迭代的第一行并且永远不会返回。这是我的代码:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "my-topic",
    bootstrap_servers="localhost:9092"),
    value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)

for msg in consumer:
    print(type(msg))
Run Code Online (Sandbox Code Playgroud)

消费者被创建并完全订阅;我可以看到它my-topic列在其属性的主题列表中_client

任何想法?

python apache-kafka kafka-consumer-api kafka-python

1
推荐指数
1
解决办法
9556
查看次数

Kafka消费者组再平衡

我正在使用 kafka 消费者组管理来处理我的消息。

我的消息的处理时间各不相同。因此,我将最大轮询间隔设置为 20 分钟,最大记录数为 20。除了上述两个之外,我还使用 5 个分区和 5 个具有默认配置值的消费者实例。

但我仍然间歇性地收到以下错误:

[Consumer clientId=consumer-3, groupId=amc_dashboard_analytics] Attempt to heartbeat failed since group is rebalancing
Run Code Online (Sandbox Code Playgroud)

我们的理解是,除非在达到消费者配置文档中写入的最大轮询间隔之前未调用轮询,否则不会发生重新平衡。但对我来说,重新平衡只发生在 20 分钟之前。

此外,在运行几个小时后,所有分配的消费者只是离开并说“尝试检测信号失败,因为组正在重新平衡”,并且不会再次加入(理想情况下应该再次加入)。

我在这里错过了什么吗?任何线索都会有帮助。

apache-kafka kafka-consumer-api

1
推荐指数
1
解决办法
2万
查看次数

Kafka 事务性读已提交消费者

我在应用程序中有事务性和普通的生产者,它们正在写入主题 kafka-topic ,如下所示。

事务性 Kafka Producer 的配置

@Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        /*The amount of time to wait before attempting to retry a failed request to a given topic partition. 
         * This avoids repeatedly sending requests in a tight loop under some failure scenarios.*/
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
        /*"The configuration controls the …
Run Code Online (Sandbox Code Playgroud)

transactions apache-kafka kafka-consumer-api spring-kafka kafka-transactions-api

1
推荐指数
1
解决办法
1433
查看次数

如果我在 kafka 中有一个复制因子为 3 的代理怎么办?

如果我创建一个新集群,其中包含 1 个代理、1 个主题、1 个分区,复制因子为 3,那么会发生什么?它会在该单个代理下创建 3 个副本(分区)?如果是,如何选举领导人?

apache-kafka kafka-consumer-api kafka-producer-api

1
推荐指数
1
解决办法
2121
查看次数

如果将group_id设置为None,Kafka消费者会收到消息,但如果不是None,它不会收到任何消息?

我有以下 Kafka 消费者,如果将 分配group_id给 None,它会很好地工作 - 它收到了所有历史消息和我新测试的消息。

consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset=auto_offset_reset,
        enable_auto_commit=enable_auto_commit,
        group_id=group_id,
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

for m in consumer:
Run Code Online (Sandbox Code Playgroud)

group_id但是,如果我将其设置为某个值,它不会收到任何内容。我尝试运行测试生产者来发送新消息,但没有收到任何消息。

消费者控制台确实显示以下消息:

2020-11-07 00:56:01 INFO ThreadPoolExecutor-0_0 base.py(重新)加入组 my_group
2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 base.py 成功加入第 497 代组 my_group
2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 subscription_state.py 更新的分区分配:[]
2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 Consumer.py 为组 my_group 设置新分配的分区 set()

python apache-kafka kafka-consumer-api kafka-python

1
推荐指数
1
解决办法
4504
查看次数

Spring Kafka Consumer Configs - 默认值和至少一次语义

我正在使用 spring-kafka 模板编写 kafka 消费者。当我实例化消费者时,Spring kafka 接受如下参数。

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
Run Code Online (Sandbox Code Playgroud)

我阅读了文档,看起来还有很多其他参数也可以作为消费者配置传递。有趣的是,每个参数都有一个默认值。我的问题是

  1. 这些是在什么基础上到达的?
  2. 是否真的需要改变这些值,如果是的话,这些值是什么
    (恕我直言,这是根据具体情况而定的。但仍然想听听专家的意见)
  3. 我们拥有的传递语义是至少一次。因此,对于这种(至少一次)传递语义,如果这些保持不变,它仍然会处理大量数据。

任何指示或答案都会对澄清我的疑问有很大帮助。

apache-kafka kafka-consumer-api spring-kafka

1
推荐指数
1
解决办法
3010
查看次数

Scala如何订阅多个kafka主题

我想在scala中将字符串arry / list转换为util.Collection [String]对象。我尝试了多种方法,但没有解决。

import org.apache.kafka.clients.consumer.KafkaConsumer


object KafkaConsumerApp {

  def main(args: Array[String]): Unit = {

    val prop:Properties = new Properties()
    prop.put("bootstrap.servers","192.168.1.100:9092,192.168.1.141:9092,192.168.1.113:9092,192.168.1.118:9092")
    prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")

    val consumer = new KafkaConsumer(prop)

    val topics = List[String] ("my_topic_partition","my_topic_partition")
    val a = Collections.singletonList(topics)

    consumer.subscribe(a)

  }
}
Run Code Online (Sandbox Code Playgroud)

Consumer.subscribe(a)返回编译时错误

Error:(24, 14) overloaded method value subscribe with alternatives:
  (x$1: java.util.regex.Pattern)Unit <and>
  (x$1: java.util.Collection[String])Unit
 cannot be applied to (java.util.List[List[String]])
    consumer.subscribe(a)
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka kafka-consumer-api

0
推荐指数
1
解决办法
236
查看次数

How to get message Apache Kafka on client JS?

Can Apache Kafka send message from server broker to client side (JS) directly?

Or I should use socket pipe?

javascript apache-kafka kafka-consumer-api

0
推荐指数
1
解决办法
54
查看次数