标签: confluent-kafka-python

无法安装 confluence-kafka:“致命错误:librdkafka/rdkafka.h:没有这样的文件或目录”

我在项目中使用 confluence-kafka Python 客户端。我正在尝试使用此客户端创建 Docker 映像。

我面临以下错误:-

#11 8.015 [pipenv.exceptions.InstallError]:       In file included from /tmp/pip-install-so_whhii/confluent-kafka_9d9553bf46cf489bb25fcb2ac7698747/src/confluent_kafka/src/Admin.c:17:
#11 8.015 [pipenv.exceptions.InstallError]:       /tmp/pip-install-so_whhii/confluent-kafka_9d9553bf46cf489bb25fcb2ac7698747/src/confluent_kafka/src/confluent_kafka.h:23:10: fatal error: librdkafka/rdkafka.h: No such file or directory
#11 8.015 [pipenv.exceptions.InstallError]:          23 | #include <librdkafka/rdkafka.h>
#11 8.015 [pipenv.exceptions.InstallError]:             |          ^~~~~~~~~~~~~~~~~~~~~~
#11 8.015 [pipenv.exceptions.InstallError]:       compilation terminated.
#11 8.015 [pipenv.exceptions.InstallError]:       error: command '/usr/bin/gcc' failed with exit code 1
#11 8.016 [pipenv.exceptions.InstallError]:       [end of output]
Run Code Online (Sandbox Code Playgroud)

根据我的搜索,它与 librdkafka 的 Apple M1 版本相关。

python apache-kafka docker librdkafka confluent-kafka-python

23
推荐指数
4
解决办法
2万
查看次数

合流的 kakfa 生产者 KafkaError{code=_MSG_TIMED_OUT,val=-192,str="本地:消息超时"}

我是 Kafka 新手,使用融合的 kafka 并尝试使用带有 'sasl.mechanism': 'PLAIN','security.protocol': 'SASL_SSL' 的 python 生产者代码从 AWS EC2 实例将消息写入现有的 kafka 主题。我尝试过此链接中的生产者示例。出现以下错误。如果有任何可以指导我什么会导致这个问题将会有很大的帮助。

 Failed to deliver message: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Run Code Online (Sandbox Code Playgroud)

confluent-kafka-python

9
推荐指数
0
解决办法
8639
查看次数

将 confluence-kafka python 库与 AWS lambda 一起使用时出错

我尝试使用confluence-kafka python 库通过 lambda 函数管理我的集群,但该函数失败并出现错误:

"Unable to import module 'Test': No module named 'confluent_kafka.cimpl'"
Run Code Online (Sandbox Code Playgroud)

我的要求.txt

requests
confluent-kafka
Run Code Online (Sandbox Code Playgroud)

为了创建 zip 文件,我将代码移动到虚拟环境的 site-packages 位置并压缩所有内容。

Python代码:

import confluent_kafka.admin
import requests
def lambda_handler(event, context):
    print("Hello World")
Run Code Online (Sandbox Code Playgroud)

我使用的是 macOS 10.X。在 Linux 上,我注意到 pip install 创建了一个单独的 confluence_kafka.libs,它不会在 mac 上创建

python amazon-web-services aws-lambda confluent-kafka-python

8
推荐指数
1
解决办法
5316
查看次数

Ubuntu 18.04 python3.7 confluence-kafka 没有名为“confluence_kafka.cimpl”的模块

我的虚拟机在 ubuntu 18.04 上运行。我在上面安装了:

  • python3.7
  • confluence-kafka 当运行 python 脚本时,我遇到了以下消息错误:
    from .cimpl import (Consumer, #noqa
    
    ModuleNotFoundError: no module named 'confluent_kafka.cimpl'
Run Code Online (Sandbox Code Playgroud)

我尝试以多种方式运行/安装/删除,但仍然出现此错误。

python confluent-kafka-python

8
推荐指数
1
解决办法
9617
查看次数

使用 Python 获取 Confluence Kafka 主题的最新消息

到目前为止,这是我尝试过的:

from confluent_kafka import Consumer

c = Consumer({... several security/server settings skipped...
              'auto.offset.reset': 'beginning',
              'group.id': 'my-group'})

c.subscribe(['my.topic'])
msg = poll(30.0)  # msg is of None type.
Run Code Online (Sandbox Code Playgroud)

msg几乎总是最终成为None这样。我认为问题可能是'my-group'已经消耗了所有消息'my.topic'......但我不在乎消息是否已经被消耗 - 我仍然需要最新的消息。具体来说,我需要最新消息的时间戳。

我又尝试了一些,从这里看来,该主题中可能有 25 条消息,但我不知道如何获取它们:

a = c.assignment()
print(a)  # Outputs [TopicPartition{topic=my.topic,partition=0,offset=-1001,error=None}]
offsets = c.get_watermark_offsets(a[0])
print(offsets)  # Outputs: (25, 25)
Run Code Online (Sandbox Code Playgroud)

如果因为该主题从未写入任何内容而没有消息,我该如何确定?如果是这样,我如何确定该主题存在了多长时间?我正在编写一个脚本,自动删除过去 X 天内未写入的任何主题(最初为 14 个 - 可能会随着时间的推移进行调整。)

python apache-kafka kafka-consumer-api confluent-platform confluent-kafka-python

6
推荐指数
1
解决办法
6289
查看次数

Kafka 消费者断开连接 - 已断开连接(处于状态 UP 684744 毫秒后)

我的卡夫卡消费者有问题。当我在本地主机中运行我的消费者时,他运行正确,但是当我在另一个环境中运行我的消费者时,他显示此错误

%6|1629836068.968|rdkafka#consumer-1| [thrd:ssl://kafka-events:11101/12101]:ssl://kafka-events:11101/12101:已断开连接(处于状态 UP 684744 毫秒后)

我不知道这个错误的根本原因是什么。

我的 kafka 消费者是用 python 3 开发的,具有汇合的 kafka DeserializingConsumer。

python-3.x apache-kafka confluent-kafka-python

5
推荐指数
0
解决办法
1007
查看次数

如何使用 confluence-kafka-python 消费最近 N 天的消息?

这个问题类似于Python KafkaConsumer start 消费来自时间戳的消息,除了我想知道如何在 Confluence 的官方 Python Kafka 客户端中执行此操作。

我研究了Consumer.offsets_for_times函数,但我对它接受TopicPartition.offset字段中的时间戳感到困惑。

a 如何offset等同于时间戳?

python apache-kafka confluent-kafka-python

4
推荐指数
1
解决办法
3480
查看次数

如何连接 Kafka python 以接受 jaas 的用户名和密码,就像在 Java 中完成的那样?

使用现有的 Java 示例,我尝试使用 python-kafka 和 confluence_kafka 库编写一个与生成器等效的 python 版本。如何使用类似于下面 Java 中的信息在 python 中配置 sasl.jass.config?

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

...
Properties props = new Properties();
...
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"<Kafka_Username>\" password=\"<Kafka_Password>\";");

Producer<String, String> producer = new KafkaProducer<>(props);
Run Code Online (Sandbox Code Playgroud)

kafka-python confluent-platform confluent-kafka-python

2
推荐指数
1
解决办法
4483
查看次数