标签: librdkafka

无法安装 confluence-kafka:“致命错误:librdkafka/rdkafka.h:没有这样的文件或目录”

我在项目中使用 confluence-kafka Python 客户端。我正在尝试使用此客户端创建 Docker 映像。

我面临以下错误:-

#11 8.015 [pipenv.exceptions.InstallError]:       In file included from /tmp/pip-install-so_whhii/confluent-kafka_9d9553bf46cf489bb25fcb2ac7698747/src/confluent_kafka/src/Admin.c:17:
#11 8.015 [pipenv.exceptions.InstallError]:       /tmp/pip-install-so_whhii/confluent-kafka_9d9553bf46cf489bb25fcb2ac7698747/src/confluent_kafka/src/confluent_kafka.h:23:10: fatal error: librdkafka/rdkafka.h: No such file or directory
#11 8.015 [pipenv.exceptions.InstallError]:          23 | #include <librdkafka/rdkafka.h>
#11 8.015 [pipenv.exceptions.InstallError]:             |          ^~~~~~~~~~~~~~~~~~~~~~
#11 8.015 [pipenv.exceptions.InstallError]:       compilation terminated.
#11 8.015 [pipenv.exceptions.InstallError]:       error: command '/usr/bin/gcc' failed with exit code 1
#11 8.016 [pipenv.exceptions.InstallError]:       [end of output]
Run Code Online (Sandbox Code Playgroud)

根据我的搜索,它与 librdkafka 的 Apple M1 版本相关。

python apache-kafka docker librdkafka confluent-kafka-python

23
推荐指数
4
解决办法
2万
查看次数

超过 max.poll.interval.ms 后,Kafka 消费者卡住了

当消费者在 5 分钟内(默认值 max.poll.interval.ms 300000ms)没有收到消息时,消费者会停止而不退出程序。消费者进程挂起,不再消费任何消息。

记录以下错误消息

MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 255ms (adjust max.poll.interval.ms for long-running message processing): leaving group
Run Code Online (Sandbox Code Playgroud)

我看到在 confluent-kafka-go 中ErrMaxPollExceeded定义了它,但无法找到它在哪里被提出。

如果出现任何此类错误,为什么程序不退出?

  • 卡夫卡 v1.1.0
  • librdkafka v1.0.0
  • confluent-kafka-go(主)

用于 kafka.Consumer 的配置

{
    "bootstrap.servers":    "private.kafka.host",
    "group.id":             "foo.bar",
    "auto.offset.reset":    "earliest",
    "enable.auto.commit":   false,
}
Run Code Online (Sandbox Code Playgroud)

go apache-kafka librdkafka confluent-platform

10
推荐指数
1
解决办法
3941
查看次数

Kafka异步和Kafka同步acks=0之间的区别?

根据我的阅读,acks=0 的 Kafka 基本上只是将消息推送到生产者缓冲区中。它不等待任何类型的确认。有了这个,我想知道它与异步生产者有什么不同?acks 如何影响异步生产者?

apache-kafka ruby-kafka librdkafka

10
推荐指数
1
解决办法
7868
查看次数

Python librdkafka生产者针对本机Apache Kafka生产者执行

我正在针对Python的confluent-kafka使用本地Java实现对Apache Kafka Producer进行测试,以查看哪个具有最大吞吐量。

我正在使用docker-compose部署一个包含3个Kafka代理和3个zookeeper实例的Kafka集群。我的docker撰写文件:https : //paste.fedoraproject.org/paste/bn7rr2~YRuIihZ06O3Q6vw/raw

这是一个非常简单的代码,其中包含Python confluent-kafka的大多数默认选项,并且在Java生产者中进行了一些配置更改,以匹配confluent-kafka的配置。

Python代码:

from confluent_kafka import Producer

producer = Producer({'bootstrap.servers': 'kafka-1:19092,kafka-2:29092,kafka-3:39092', 'linger.ms': 300, "max.in.flight.requests.per.connection": 1000000, "queue.buffering.max.kbytes": 1048576, "message.max.bytes": 1000000,
    'default.topic.config': {'acks': "all"}})

ss = '0123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357'

def f():
    import time
    start = time.time()
    for i in xrange(1000000):
        try:
            producer.produce('test-topic', ss)
        except Exception:
            producer.poll(1)
            try:
                producer.produce('test-topic', ss)
            except Exception:
                producer.flush(30)
                producer.produce('test-topic', ss)
        producer.poll(0)
    producer.flush(30)
    print(time.time() - start)


if __name__ == '__main__':
    f()
Run Code Online (Sandbox Code Playgroud)

Java实现。配置与librdkafka中的config相同。按照Edenhill的建议更改了linger.ms和回调。

package com.amit.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.nio.charset.Charset;
import java.util.Properties; …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api confluent-kafka librdkafka

8
推荐指数
1
解决办法
455
查看次数

Kafka 偏移管理:enable.auto.commit 与 enable.auto.offset.store

默认情况下,Kafka Consumer 会定期提交当前偏移量,除非通过禁用enable.auto.commit. 根据文档,您将负责自己提交偏移量。所以当我想要手动控制时,这似乎是要走的路,但是文档还提到了存储的偏移量,如果你想要手动控制,你应该禁用enable.auto.offset.store和使用rd_kafka_offsets_store()并保持自动提交不变。

有人可以解释为什么首选后一种方法吗?禁用自动提交应该具有完全相同的效果吗?

apache-kafka librdkafka

8
推荐指数
1
解决办法
3302
查看次数

bazel rules_go:将二进制文件链接到工作区中另一个目标生成的静态 C++ 库(.a 文件)

我在我的 go 二进制文件中使用 confluent-kafka-go 库,这个库需要与 librdkafka 链接。我项目中的其他目标使用 librdkakfa,所以我使用 rules_foreign_cc 的 cmake_external 规则生成了静态 librdkafka.a 和 librdkafka++.a:

   //this is my "third_party/kafka/BUILD" file:

load("@rules_foreign_cc//tools/build_defs:cmake.bzl", "cmake_external")

    cmake_external(
        name = "librdkafka",
        cache_entries = {
            "RDKAFKA_BUILD_STATIC": "ON",
            "WITH_ZSTD": "OFF",
            "WITH_SSL": "OFF",
            "WITH_SASL": "OFF",
            "ENABLE_LZ4_EXT": "OFF",
            "WITH_LIBDL": "OFF",
        },
        lib_source = "@kafka//:all",
        static_libraries = [
            "librdkafka++.a",
            "librdkafka.a",
        ],
        visibility = ["//visibility:public"],
    ) 
Run Code Online (Sandbox Code Playgroud)

生成 librdkafka 库和头文件就好了:

 $ bazel build //third_party/kafka:librdkafka 
INFO: Analysed target //third_party/kafka:librdkafka (0 packages loaded, 0 targets configured).
INFO: Found 1 target...
Target //third_party/kafka:librdkafka up-to-date:
  bazel-genfiles/third_party/kafka/librdkafka/include …
Run Code Online (Sandbox Code Playgroud)

bazel librdkafka

6
推荐指数
1
解决办法
705
查看次数

librdkafka 消费者和 ssl 配置

我使用 librdkafka 作为客户端使用者,并且我已将代理和客户端配置为支持 SSL,用于代理:

listeners = PLAINTEXT://172.20.54.9:9092,SSL://172.20.54.9:9093
ssl.keystore.location=E:/project_files/Project/kafka_2.11-2.1.0/config/kafka.server.keystore.jks
ssl.keystore.password=ismail
ssl.key.password=ismail
ssl.truststore.location=E:/project_files/Project/kafka_2.11-2.1.0/config/kafka.server.truststore.jks
ssl.truststore.password=password
Run Code Online (Sandbox Code Playgroud)

在客户端我使用这个配置:

rd_kafka_conf_set(conf, "metadata.broker.list", "172.20.54.9:9093",
                      NULL, 0);
rd_kafka_conf_set(conf, "security.protocol", "ssl",
                      NULL, 0);
rd_kafka_conf_set(conf, "ssl.ca.location", "/usr/bin/NetSens/CARoot.pem",
                      NULL, 0);
rd_kafka_conf_set(conf, "ssl.certificate.location", "/usr/bin/NetSens/certificate.pem",
                      NULL, 0);
rd_kafka_conf_set(conf, "ssl.key.location", "/usr/bin/NetSens/key.pem",
                      NULL, 0);
rd_kafka_conf_set(conf, "ssl.key.password", "password",
                      NULL, 0);
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

1559309856.897 RDKAFKA-3-ERROR: rdkafka#consumer-1: [thrd:ssl://172.20.54.9:9093/bootstrap]: ssl://172.20.54.9:9093/bootstrap: SSL handshake failed: ../ssl/record/ssl3_record.c:252: error:1408F10B:SSL routines:ssl3_get_record:wrong version number:  (after 7ms in state CONNECT)
Run Code Online (Sandbox Code Playgroud)

有关更多信息,我还有另一个使用相同证书和密钥的 python kafka 客户端,它工作正常。我将不胜感激任何帮助。

c ssl apache-kafka librdkafka apache-kafka-security

5
推荐指数
1
解决办法
3168
查看次数

LibrdKafkaError:经纪人:随机运行约 2 小时后未知成员

现在,我想实施node-rdkafka到我们的服务中,但我多次遇到此错误Broker: Unknown member。github 上的同一问题是https://github.com/confluenceinc/confluence-kafka-dotnet/issues/1464。他们说我们的消费者使用相同的组 ID 来重试或延迟。但我没有发现我的代码有任何重试和延迟。或https://github.com/confluenceinc/confluence-kafka-python/issues/1004,但我重新检查了所有消费者组 ID,它是唯一的。

生产者的配置node-rdkafka如下:

      this.producer = new Producer({
        "client.id": this.cliendID,
        "metadata.broker.list": this.brokerList,
        'compression.codec': "lz4",
        'retry.backoff.ms': 200,
        'socket.keepalive.enable': true,
        'queue.buffering.max.messages': 100000,
        'queue.buffering.max.ms': 1000,
        'batch.num.messages': 1000000,
        "transaction.timeout.ms": 2000,
        "enable.idempotence": false,
        "max.in.flight.requests.per.connection": 1,
        "debug": this.debug,
        'dr_cb': true,
        "retries": 0,
        "log_cb": (_: any) => console.log(`log_cb =>`, _),
        "sasl.username": this.saslUsername,
        "sasl.password": this.saslPassword,
        "sasl.mechanism": this.saslMechanism,
        "security.protocol": this.securityProtocol
      }, {
        "acks": -1
      })
Run Code Online (Sandbox Code Playgroud)

Consumer的配置node-rdkafka如下:

this.consumer = new KafkaConsumer({
        'group.id': this.groupID,
        'metadata.broker.list': …
Run Code Online (Sandbox Code Playgroud)

apache-kafka librdkafka kafkajs confluent-platform

5
推荐指数
0
解决办法
3607
查看次数

librdkafka生产者如何了解Kafka中的新主题分区

我正在运行 rdkafka_simple_ Producer.c 来向 Kafka 集群生成消息。我有一个主题和 30 个分区。使用默认的循环分区器。当生产者正在工作并向 Kafka 生成消息时,我向 Kafka 添加更多分区

kafka/bin/kafka-topics.sh --alter --zookeeper server2:2181 --topic demotest --partitions 40
Run Code Online (Sandbox Code Playgroud)

我希望制作人能够意识到这一变化并最终开始制作所有 40 个主题。然而,最后我只看到数据被生成到​​原来的30个分区。

在测试中,制作人运行了 2 分钟。

我是否需要在 simple_ Producer 中添加任何函数调用,或者它是我需要考虑的 Kafka 参数吗?

提前致谢!

apache-kafka kafka-producer-api librdkafka

3
推荐指数
1
解决办法
1395
查看次数

如何使用 librdkafka 设置 Kafka 消息的最大大小

我正在尝试使用 Kafka 发送 ~10Mb 的消息。我知道它的默认大小是 1Mb,但这是一个硬性限制吗?librdkafka 可以支持 >10Mb 吗?如何设置?

apache-kafka librdkafka

2
推荐指数
1
解决办法
2480
查看次数