我在项目中使用 confluence-kafka Python 客户端。我正在尝试使用此客户端创建 Docker 映像。
我面临以下错误:-
#11 8.015 [pipenv.exceptions.InstallError]: In file included from /tmp/pip-install-so_whhii/confluent-kafka_9d9553bf46cf489bb25fcb2ac7698747/src/confluent_kafka/src/Admin.c:17:
#11 8.015 [pipenv.exceptions.InstallError]: /tmp/pip-install-so_whhii/confluent-kafka_9d9553bf46cf489bb25fcb2ac7698747/src/confluent_kafka/src/confluent_kafka.h:23:10: fatal error: librdkafka/rdkafka.h: No such file or directory
#11 8.015 [pipenv.exceptions.InstallError]: 23 | #include <librdkafka/rdkafka.h>
#11 8.015 [pipenv.exceptions.InstallError]: | ^~~~~~~~~~~~~~~~~~~~~~
#11 8.015 [pipenv.exceptions.InstallError]: compilation terminated.
#11 8.015 [pipenv.exceptions.InstallError]: error: command '/usr/bin/gcc' failed with exit code 1
#11 8.016 [pipenv.exceptions.InstallError]: [end of output]
Run Code Online (Sandbox Code Playgroud)
根据我的搜索,它与 librdkafka 的 Apple M1 版本相关。
python apache-kafka docker librdkafka confluent-kafka-python
当消费者在 5 分钟内(默认值 max.poll.interval.ms 300000ms)没有收到消息时,消费者会停止而不退出程序。消费者进程挂起,不再消费任何消息。
记录以下错误消息
MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 255ms (adjust max.poll.interval.ms for long-running message processing): leaving group
Run Code Online (Sandbox Code Playgroud)
我看到在 confluent-kafka-go 中ErrMaxPollExceeded
定义了它,但无法找到它在哪里被提出。
如果出现任何此类错误,为什么程序不退出?
用于 kafka.Consumer 的配置
{
"bootstrap.servers": "private.kafka.host",
"group.id": "foo.bar",
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
}
Run Code Online (Sandbox Code Playgroud) 根据我的阅读,acks=0 的 Kafka 基本上只是将消息推送到生产者缓冲区中。它不等待任何类型的确认。有了这个,我想知道它与异步生产者有什么不同?acks 如何影响异步生产者?
我正在针对Python的confluent-kafka使用本地Java实现对Apache Kafka Producer进行测试,以查看哪个具有最大吞吐量。
我正在使用docker-compose部署一个包含3个Kafka代理和3个zookeeper实例的Kafka集群。我的docker撰写文件:https : //paste.fedoraproject.org/paste/bn7rr2~YRuIihZ06O3Q6vw/raw
这是一个非常简单的代码,其中包含Python confluent-kafka的大多数默认选项,并且在Java生产者中进行了一些配置更改,以匹配confluent-kafka的配置。
Python代码:
from confluent_kafka import Producer
producer = Producer({'bootstrap.servers': 'kafka-1:19092,kafka-2:29092,kafka-3:39092', 'linger.ms': 300, "max.in.flight.requests.per.connection": 1000000, "queue.buffering.max.kbytes": 1048576, "message.max.bytes": 1000000,
'default.topic.config': {'acks': "all"}})
ss = '0123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357'
def f():
import time
start = time.time()
for i in xrange(1000000):
try:
producer.produce('test-topic', ss)
except Exception:
producer.poll(1)
try:
producer.produce('test-topic', ss)
except Exception:
producer.flush(30)
producer.produce('test-topic', ss)
producer.poll(0)
producer.flush(30)
print(time.time() - start)
if __name__ == '__main__':
f()
Run Code Online (Sandbox Code Playgroud)
Java实现。配置与librdkafka中的config相同。按照Edenhill的建议更改了linger.ms和回调。
package com.amit.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.nio.charset.Charset;
import java.util.Properties; …
Run Code Online (Sandbox Code Playgroud) 默认情况下,Kafka Consumer 会定期提交当前偏移量,除非通过禁用enable.auto.commit
. 根据文档,您将负责自己提交偏移量。所以当我想要手动控制时,这似乎是要走的路,但是文档还提到了存储的偏移量,如果你想要手动控制,你应该禁用enable.auto.offset.store
和使用rd_kafka_offsets_store()
并保持自动提交不变。
有人可以解释为什么首选后一种方法吗?禁用自动提交应该具有完全相同的效果吗?
我在我的 go 二进制文件中使用 confluent-kafka-go 库,这个库需要与 librdkafka 链接。我项目中的其他目标使用 librdkakfa,所以我使用 rules_foreign_cc 的 cmake_external 规则生成了静态 librdkafka.a 和 librdkafka++.a:
//this is my "third_party/kafka/BUILD" file:
load("@rules_foreign_cc//tools/build_defs:cmake.bzl", "cmake_external")
cmake_external(
name = "librdkafka",
cache_entries = {
"RDKAFKA_BUILD_STATIC": "ON",
"WITH_ZSTD": "OFF",
"WITH_SSL": "OFF",
"WITH_SASL": "OFF",
"ENABLE_LZ4_EXT": "OFF",
"WITH_LIBDL": "OFF",
},
lib_source = "@kafka//:all",
static_libraries = [
"librdkafka++.a",
"librdkafka.a",
],
visibility = ["//visibility:public"],
)
Run Code Online (Sandbox Code Playgroud)
生成 librdkafka 库和头文件就好了:
$ bazel build //third_party/kafka:librdkafka
INFO: Analysed target //third_party/kafka:librdkafka (0 packages loaded, 0 targets configured).
INFO: Found 1 target...
Target //third_party/kafka:librdkafka up-to-date:
bazel-genfiles/third_party/kafka/librdkafka/include …
Run Code Online (Sandbox Code Playgroud) 我使用 librdkafka 作为客户端使用者,并且我已将代理和客户端配置为支持 SSL,用于代理:
listeners = PLAINTEXT://172.20.54.9:9092,SSL://172.20.54.9:9093
ssl.keystore.location=E:/project_files/Project/kafka_2.11-2.1.0/config/kafka.server.keystore.jks
ssl.keystore.password=ismail
ssl.key.password=ismail
ssl.truststore.location=E:/project_files/Project/kafka_2.11-2.1.0/config/kafka.server.truststore.jks
ssl.truststore.password=password
Run Code Online (Sandbox Code Playgroud)
在客户端我使用这个配置:
rd_kafka_conf_set(conf, "metadata.broker.list", "172.20.54.9:9093",
NULL, 0);
rd_kafka_conf_set(conf, "security.protocol", "ssl",
NULL, 0);
rd_kafka_conf_set(conf, "ssl.ca.location", "/usr/bin/NetSens/CARoot.pem",
NULL, 0);
rd_kafka_conf_set(conf, "ssl.certificate.location", "/usr/bin/NetSens/certificate.pem",
NULL, 0);
rd_kafka_conf_set(conf, "ssl.key.location", "/usr/bin/NetSens/key.pem",
NULL, 0);
rd_kafka_conf_set(conf, "ssl.key.password", "password",
NULL, 0);
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
1559309856.897 RDKAFKA-3-ERROR: rdkafka#consumer-1: [thrd:ssl://172.20.54.9:9093/bootstrap]: ssl://172.20.54.9:9093/bootstrap: SSL handshake failed: ../ssl/record/ssl3_record.c:252: error:1408F10B:SSL routines:ssl3_get_record:wrong version number: (after 7ms in state CONNECT)
Run Code Online (Sandbox Code Playgroud)
有关更多信息,我还有另一个使用相同证书和密钥的 python kafka 客户端,它工作正常。我将不胜感激任何帮助。
现在,我想实施node-rdkafka
到我们的服务中,但我多次遇到此错误Broker: Unknown member
。github 上的同一问题是https://github.com/confluenceinc/confluence-kafka-dotnet/issues/1464。他们说我们的消费者使用相同的组 ID 来重试或延迟。但我没有发现我的代码有任何重试和延迟。或https://github.com/confluenceinc/confluence-kafka-python/issues/1004,但我重新检查了所有消费者组 ID,它是唯一的。
生产者的配置node-rdkafka
如下:
this.producer = new Producer({
"client.id": this.cliendID,
"metadata.broker.list": this.brokerList,
'compression.codec': "lz4",
'retry.backoff.ms': 200,
'socket.keepalive.enable': true,
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms': 1000,
'batch.num.messages': 1000000,
"transaction.timeout.ms": 2000,
"enable.idempotence": false,
"max.in.flight.requests.per.connection": 1,
"debug": this.debug,
'dr_cb': true,
"retries": 0,
"log_cb": (_: any) => console.log(`log_cb =>`, _),
"sasl.username": this.saslUsername,
"sasl.password": this.saslPassword,
"sasl.mechanism": this.saslMechanism,
"security.protocol": this.securityProtocol
}, {
"acks": -1
})
Run Code Online (Sandbox Code Playgroud)
Consumer的配置node-rdkafka
如下:
this.consumer = new KafkaConsumer({
'group.id': this.groupID,
'metadata.broker.list': …
Run Code Online (Sandbox Code Playgroud) 我正在运行 rdkafka_simple_ Producer.c 来向 Kafka 集群生成消息。我有一个主题和 30 个分区。使用默认的循环分区器。当生产者正在工作并向 Kafka 生成消息时,我向 Kafka 添加更多分区
kafka/bin/kafka-topics.sh --alter --zookeeper server2:2181 --topic demotest --partitions 40
Run Code Online (Sandbox Code Playgroud)
我希望制作人能够意识到这一变化并最终开始制作所有 40 个主题。然而,最后我只看到数据被生成到原来的30个分区。
在测试中,制作人运行了 2 分钟。
我是否需要在 simple_ Producer 中添加任何函数调用,或者它是我需要考虑的 Kafka 参数吗?
提前致谢!
我正在尝试使用 Kafka 发送 ~10Mb 的消息。我知道它的默认大小是 1Mb,但这是一个硬性限制吗?librdkafka 可以支持 >10Mb 吗?如何设置?
librdkafka ×10
apache-kafka ×9
bazel ×1
c ×1
docker ×1
go ×1
kafkajs ×1
python ×1
ruby-kafka ×1
ssl ×1