我想弄清楚是否需要使用 KafkaConsumer.commitSync(Map offsets) 打开手动提交
当我使用记录时,我需要确保它们被我的服务处理,我不能失去它们,但是如果我的服务抛出一堆错误或崩溃,我不想移动偏移量,直到我知道它们已被处理。
commitSync 看起来像我需要调用的方法吗?
如果我没有使用 commitSync,而是使用自动偏移提交,当服务在尝试处理一些 Kafka 事件时崩溃时,我是否有可能丢失事件。换句话说,当使用自动偏移提交时,偏移量会在什么时候提交?
尝试在 Google Cloud (GCP) Pub/Sub 与 Manager Kafka Service 之间做出选择。
在最新的更新中,Pub/Sub 添加了对重放之前处理过的消息的支持,这是一个值得欢迎的变化。
我在他们的文档中找不到的一个功能是,我们是否可以拥有类似于 Kafka 消费者组的功能,即拥有一组订阅者,每个订阅者处理来自同一主题的数据,并且能够从头开始重新处理数据订阅者(消费群体)而其他人则不受其影响。例如:
假设您有一个主题 StockTicks
你有两个消费者群体
CG1:有两个消费者
CG2:有另外两个消费者
在 Kafka 中,我可以独立读取这些组之间的消息,但我可以使用 Pub/Sub 做同样的事情吗?
而且Kafka允许你从头开始重放消息,我可以对Pub/Sub做同样的事情吗?如果我不能重放CG创建之前发布的消息,我可以,但是我可以重放CG创建之后提交的消息吗? CG/订阅者已创建?
apache-kafka google-cloud-platform google-cloud-pubsub kafka-consumer-api
我已经使用kafka-python库编写了一个 python 脚本,它将消息写入和读取到kafka. 我写消息没有任何问题;kafka我可以使用控制台工具检索它们。但我无法使用我的 python 脚本读取它们。我的消费者有一个 for ,它冻结在迭代的第一行并且永远不会返回。这是我的代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"my-topic",
bootstrap_servers="localhost:9092"),
value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)
for msg in consumer:
print(type(msg))
Run Code Online (Sandbox Code Playgroud)
消费者被创建并完全订阅;我可以看到它my-topic列在其属性的主题列表中_client。
任何想法?
我正在使用 kafka 消费者组管理来处理我的消息。
我的消息的处理时间各不相同。因此,我将最大轮询间隔设置为 20 分钟,最大记录数为 20。除了上述两个之外,我还使用 5 个分区和 5 个具有默认配置值的消费者实例。
但我仍然间歇性地收到以下错误:
[Consumer clientId=consumer-3, groupId=amc_dashboard_analytics] Attempt to heartbeat failed since group is rebalancing
Run Code Online (Sandbox Code Playgroud)
我们的理解是,除非在达到消费者配置文档中写入的最大轮询间隔之前未调用轮询,否则不会发生重新平衡。但对我来说,重新平衡只发生在 20 分钟之前。
此外,在运行几个小时后,所有分配的消费者只是离开并说“尝试检测信号失败,因为组正在重新平衡”,并且不会再次加入(理想情况下应该再次加入)。
我在这里错过了什么吗?任何线索都会有帮助。
我在应用程序中有事务性和普通的生产者,它们正在写入主题 kafka-topic ,如下所示。
事务性 Kafka Producer 的配置
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
/*The amount of time to wait before attempting to retry a failed request to a given topic partition.
* This avoids repeatedly sending requests in a tight loop under some failure scenarios.*/
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
/*"The configuration controls the …Run Code Online (Sandbox Code Playgroud) transactions apache-kafka kafka-consumer-api spring-kafka kafka-transactions-api
如果我创建一个新集群,其中包含 1 个代理、1 个主题、1 个分区,复制因子为 3,那么会发生什么?它会在该单个代理下创建 3 个副本(分区)?如果是,如何选举领导人?
我有以下 Kafka 消费者,如果将 分配group_id给 None,它会很好地工作 - 它收到了所有历史消息和我新测试的消息。
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=enable_auto_commit,
group_id=group_id,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for m in consumer:
Run Code Online (Sandbox Code Playgroud)
group_id但是,如果我将其设置为某个值,它不会收到任何内容。我尝试运行测试生产者来发送新消息,但没有收到任何消息。
消费者控制台确实显示以下消息:
2020-11-07 00:56:01 INFO ThreadPoolExecutor-0_0 base.py(重新)加入组 my_group 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 base.py 成功加入第 497 代组 my_group 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 subscription_state.py 更新的分区分配:[] 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 Consumer.py 为组 my_group 设置新分配的分区 set()
我正在使用 spring-kafka 模板编写 kafka 消费者。当我实例化消费者时,Spring kafka 接受如下参数。
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
Run Code Online (Sandbox Code Playgroud)
我阅读了文档,看起来还有很多其他参数也可以作为消费者配置传递。有趣的是,每个参数都有一个默认值。我的问题是
任何指示或答案都会对澄清我的疑问有很大帮助。
我想在scala中将字符串arry / list转换为util.Collection [String]对象。我尝试了多种方法,但没有解决。
import org.apache.kafka.clients.consumer.KafkaConsumer
object KafkaConsumerApp {
def main(args: Array[String]): Unit = {
val prop:Properties = new Properties()
prop.put("bootstrap.servers","192.168.1.100:9092,192.168.1.141:9092,192.168.1.113:9092,192.168.1.118:9092")
prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer(prop)
val topics = List[String] ("my_topic_partition","my_topic_partition")
val a = Collections.singletonList(topics)
consumer.subscribe(a)
}
}
Run Code Online (Sandbox Code Playgroud)
Consumer.subscribe(a)返回编译时错误
Error:(24, 14) overloaded method value subscribe with alternatives:
(x$1: java.util.regex.Pattern)Unit <and>
(x$1: java.util.Collection[String])Unit
cannot be applied to (java.util.List[List[String]])
consumer.subscribe(a)
Run Code Online (Sandbox Code Playgroud) Can Apache Kafka send message from server broker to client side (JS) directly?
Or I should use socket pipe?
apache-kafka ×10
kafka-python ×2
python ×2
spring-kafka ×2
javascript ×1
scala ×1
transactions ×1