标签: kafka-python

kafka-python消费者没有收到消息

我无法KafaConsumer从头开始读取,或从任何其他显式偏移读取.

为同一主题的使用者运行命令行工具,我确实看到带有该--from-beginning选项的消息,否则它会挂起

$ ./kafka-console-consumer.sh --zookeeper {localhost:port} --topic {topic_name} --from-beginning
Run Code Online (Sandbox Code Playgroud)

如果我通过python运行它,它会挂起,我怀疑是由不正确的消费者配置引起的

consumer = KafkaConsumer(topic_name,
                     bootstrap_servers=['localhost:9092'],
                     group_id=None,
                     auto_commit_enable=False,
                     auto_offset_reset='smallest')

print "Consuming messages from the given topic"
for message in consumer:
    print "Message", message
    if message is not None:
        print message.offset, message.value

print "Quit"
Run Code Online (Sandbox Code Playgroud)

输出:

使用来自给定主题的消息(之后挂起)

我使用kafka-python 0.9.5并且代理运行kafka 8.2.不确定究竟是什么问题.

按照dpkp的建议设置_group_id = None_以模拟控制台使用者的行为.

python apache-kafka kafka-consumer-api kafka-python

3
推荐指数
4
解决办法
5411
查看次数

Kafka 10-具有身份验证和授权的Python客户端

我有一个启用了SASL_SSL(身份验证(JAAS)和授权)的Kafka10集群。可以使用Java客户端和以下道具通过SASL进行连接。

ssl.keystore.location="client_keystore.jks"
ssl.keystore.password="password"
ssl.truststore.location="clienttruststore"
ssl.truststore.password="password" 
Run Code Online (Sandbox Code Playgroud)

and passing the JAAS conf file thru the JVM params.

-Djava.security.auth.login.config=/path/to/client_jaas.conf
Run Code Online (Sandbox Code Playgroud)

Is there anyway to achieve the same thing with the python client?

python apache-kafka kafka-python

3
推荐指数
1
解决办法
4084
查看次数

在 alpine 容器中使用 confluence-kafka python 客户端

我正在尝试运行一个与 kafka 通信的简单 python 应用程序。我正在寻找一个高山容器。这是我当前的 dockerfile(它不是最佳的......只是想让事情暂时正常工作)。

FROM python:3.6-alpine
MAINTAINER Ashic Mahtab (ashic@live.com)

RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app

RUN echo "http://dl-cdn.alpinelinux.org/alpine/edge/community" >> /etc/apk/repositories
RUN apk update && apk --no-cache add librdkafka


COPY requirements.txt /usr/src/app/
RUN pip install --no-cache-dir -r requirements.txt

COPY api /usr/src/app/api
COPY static /usr/src/app/static

CMD ["python", "api/index.py"]
Run Code Online (Sandbox Code Playgroud)

需求文件中有 confluence-kafka 。构建失败

OK: 8784 distinct packages available
fetch http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz
fetch http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz
fetch http://dl-cdn.alpinelinux.org/alpine/edge/community/x86_64/APKINDEX.tar.gz
ERROR: unsatisfiable constraints:
  so:libcrypto.so.41 (missing):
    required by:
                 librdkafka-0.9.4-r1[so:libcrypto.so.41]
                 librdkafka-0.9.4-r1[so:libcrypto.so.41]
                 librdkafka-0.9.4-r1[so:libcrypto.so.41]
  so:libssl.so.43 (missing):
    required by:
                 librdkafka-0.9.4-r1[so:libssl.so.43]
                 librdkafka-0.9.4-r1[so:libssl.so.43]
                 librdkafka-0.9.4-r1[so:libssl.so.43] …
Run Code Online (Sandbox Code Playgroud)

apache-kafka docker kafka-python alpine-linux confluent-platform

3
推荐指数
1
解决办法
6624
查看次数

Kafka producer.send 从不发送消息

我使用 Kafka 2.12 和 kafka-python 模块作为 Kafka 客户端。我正在尝试测试一个简单的生产者:

class Producer(Process):
daemon = True
def run(self):
    producer = KafkaProducer(bootstrap_servers='kafka:9092')
    print("Sending messages...")
    producer.send('topic', json.dumps(message).encode('utf-8'))
Run Code Online (Sandbox Code Playgroud)

当这个过程被实例化时,消费者永远不会收到消息

如果我刷新生产者并更改 linger_ms 参数(使其同步),则消息将由消费者发送和读取:

class Producer(Process):
daemon = True
def run(self):
    producer = KafkaProducer(bootstrap_servers='kafka:9092', linger_ms=10)
    print("Sending messages...")
    producer.send('topic', json.dumps(message).encode('utf-8'))
    producer.flush()
Run Code Online (Sandbox Code Playgroud)

在之前的 Kafka 版本中,有参数 queue.buffering.max.ms 来指定生产者将等待多长时间才能将消息发送到队列中,但在最新版本(kafka-python 1.3.3)中不存在。我如何在较新的 Kafka 版本中指定它以保持我的通信异步?

谢谢!

python apache-kafka kafka-python

3
推荐指数
2
解决办法
5637
查看次数

kafka 消费者搜索不起作用:断言错误:未分配的分区

con当我尝试从我的主题接收消息时,下面定义的 kafka 消费者工作得非常好;但是,当我尝试使用seek方法或其任何变体更改偏移量时,这给我带来了麻烦。即seek_to_beginningseek_to_end

from kafka import KafkaConsumer, TopicPartition

con = KafkaConsumer(my_topic, bootstrap_servers = my_bootstrapservers, group_id = my_groupid)
p = con.partitions_for_topic(my_topic)
my_partition = p.pop()
tp = TopicPartition(topic = my_topic, partition = my_partition)
print ('*** tp: ', tp)
con.seek_to_beginning(tp)
Run Code Online (Sandbox Code Playgroud)

它生成以下输出和以下错误:

*** tp:  TopicPartition(topic='mytopic', partition=0)
File "/myhome/anaconda3/lib/python3.6/site-packages/kafka/consumer/group.py", line 735, in seek_to_beginning
assert p in self._subscription.assigned_partitions(), 'Unassigned partition'
AssertionError: Unassigned partition
Run Code Online (Sandbox Code Playgroud)

“未分配的分区”错误对我来说似乎无效,因为我只是从使用者本身获取分区和 TopicPartition 并将其传递回搜索方法,因此它确实被分配了。有任何想法吗?

seek kafka-consumer-api kafka-python

3
推荐指数
1
解决办法
2552
查看次数

Python kafka:有没有办法在发布新消息之前阻止消费者关注 kafka 主题?

我有一个消费者订阅了一个测试主题,其中生产者线程定期发布。我希望能够阻塞消费者线程,直到出现新消息 - 然后处理该消息并再次开始等待。我最接近的是:

consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
                         bootstrap_servers=[localhost_],
                         api_version=(0, 10), consumer_timeout_ms=1000)
while True:
    print(consumer.poll(timeout_ms=5000))
Run Code Online (Sandbox Code Playgroud)

有没有更惯用的方法(或者这种方法有什么我看不到的严重问题)?

卡夫卡新手,因此非常欢迎对此设计的一般建议。完整(运行)示例:

import time
from threading import Thread

import kafka
from kafka import KafkaProducer, KafkaConsumer

print('python-kafka:', kafka.__version__)

def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(str(key), encoding='utf-8')
        value_bytes = bytes(str(value), encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
    except Exception as ex:
        print('Exception in publishing message\n', ex)

localhost_ = 'localhost:9092'

def kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=[localhost_],
                                  api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka') …
Run Code Online (Sandbox Code Playgroud)

python python-3.x apache-kafka kafka-python

3
推荐指数
1
解决办法
6636
查看次数

Kafka-python 如何消费json消息

我是 Python 新手,从 Kafka 开始。我有一个需要发送和使用 json 消息的要求。为此,我使用kafka-python与 Kafka 进行通信。

#Producer.py
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('offering_new', {"dataObjectID": "test1"})

#Consumer.py
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',auto_offset_reset='earliest', value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['offering_new'])
for message in consumer :
    print(message)
Run Code Online (Sandbox Code Playgroud)

但是,我在消费者上遇到以下异常:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1111, in __next__
    return next(self._iterator)
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in _message_generator
    for msg in self._fetcher:
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 482, …
Run Code Online (Sandbox Code Playgroud)

python python-3.x apache-kafka kafka-python

3
推荐指数
1
解决办法
8032
查看次数

运行python kafka生产者时“self.async”上的SyntaxError

$ docker run 34a24a8a745e
Traceback (most recent call last):
  File "//producer.py", line 1, in <module>
    from kafka.producer import KafkaProducer
  File "/usr/local/lib/python3.9/site-packages/kafka/__init__.py", line 23, in <module>
    from kafka.producer import KafkaProducer
  File "/usr/local/lib/python3.9/site-packages/kafka/producer/__init__.py", line 4, in <module>
    from .simple import SimpleProducer
  File "/usr/local/lib/python3.9/site-packages/kafka/producer/simple.py", line 54
    return '<SimpleProducer batch=%s>' % self.async
                                              ^
SyntaxError: invalid syntax

Run Code Online (Sandbox Code Playgroud)

我曾看到过一个相同的早期错误,但我正在使用 from kafka import KafkaProducer

我的新 python 脚本使用from kafka.producer import KafkaProducer但我仍然收到 self.async 错误

python python-3.x kafka-python

3
推荐指数
1
解决办法
1612
查看次数

pip install confluence-kafka 在 mac 中出现错误

当我尝试时 pip install confluent-kafka出现以下错误


#include <librdkafka/rdkafka.h>
             ^~~~~~~~~~~~~~~~~~~~~~
    1 error generated.
    error: command '/usr/bin/gcc' failed with exit code 1
Run Code Online (Sandbox Code Playgroud)

我正在使用 python 版本 3.9 和 macOs Monterey

python django macos kafka-python

3
推荐指数
1
解决办法
5420
查看次数

Python KafkaConsumer 从时间戳开始消费消息

我打算跳过主题的开头,只读取从某个时间戳到结尾的消息。关于如何实现这一目标的任何提示?

python-3.x apache-kafka kafka-consumer-api kafka-python

2
推荐指数
1
解决办法
3328
查看次数