我无法KafaConsumer从头开始读取,或从任何其他显式偏移读取.
为同一主题的使用者运行命令行工具,我确实看到带有该--from-beginning选项的消息,否则它会挂起
$ ./kafka-console-consumer.sh --zookeeper {localhost:port} --topic {topic_name} --from-beginning
Run Code Online (Sandbox Code Playgroud)
如果我通过python运行它,它会挂起,我怀疑是由不正确的消费者配置引起的
consumer = KafkaConsumer(topic_name,
bootstrap_servers=['localhost:9092'],
group_id=None,
auto_commit_enable=False,
auto_offset_reset='smallest')
print "Consuming messages from the given topic"
for message in consumer:
print "Message", message
if message is not None:
print message.offset, message.value
print "Quit"
Run Code Online (Sandbox Code Playgroud)
使用来自给定主题的消息(之后挂起)
我使用kafka-python 0.9.5并且代理运行kafka 8.2.不确定究竟是什么问题.
按照dpkp的建议设置_group_id = None_以模拟控制台使用者的行为.
我有一个启用了SASL_SSL(身份验证(JAAS)和授权)的Kafka10集群。可以使用Java客户端和以下道具通过SASL进行连接。
ssl.keystore.location="client_keystore.jks"
ssl.keystore.password="password"
ssl.truststore.location="clienttruststore"
ssl.truststore.password="password"
Run Code Online (Sandbox Code Playgroud)
and passing the JAAS conf file thru the JVM params.
-Djava.security.auth.login.config=/path/to/client_jaas.conf
Run Code Online (Sandbox Code Playgroud)
Is there anyway to achieve the same thing with the python client?
我正在尝试运行一个与 kafka 通信的简单 python 应用程序。我正在寻找一个高山容器。这是我当前的 dockerfile(它不是最佳的......只是想让事情暂时正常工作)。
FROM python:3.6-alpine
MAINTAINER Ashic Mahtab (ashic@live.com)
RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app
RUN echo "http://dl-cdn.alpinelinux.org/alpine/edge/community" >> /etc/apk/repositories
RUN apk update && apk --no-cache add librdkafka
COPY requirements.txt /usr/src/app/
RUN pip install --no-cache-dir -r requirements.txt
COPY api /usr/src/app/api
COPY static /usr/src/app/static
CMD ["python", "api/index.py"]
Run Code Online (Sandbox Code Playgroud)
需求文件中有 confluence-kafka 。构建失败
OK: 8784 distinct packages available
fetch http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz
fetch http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz
fetch http://dl-cdn.alpinelinux.org/alpine/edge/community/x86_64/APKINDEX.tar.gz
ERROR: unsatisfiable constraints:
so:libcrypto.so.41 (missing):
required by:
librdkafka-0.9.4-r1[so:libcrypto.so.41]
librdkafka-0.9.4-r1[so:libcrypto.so.41]
librdkafka-0.9.4-r1[so:libcrypto.so.41]
so:libssl.so.43 (missing):
required by:
librdkafka-0.9.4-r1[so:libssl.so.43]
librdkafka-0.9.4-r1[so:libssl.so.43]
librdkafka-0.9.4-r1[so:libssl.so.43] …Run Code Online (Sandbox Code Playgroud) apache-kafka docker kafka-python alpine-linux confluent-platform
我使用 Kafka 2.12 和 kafka-python 模块作为 Kafka 客户端。我正在尝试测试一个简单的生产者:
class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092')
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))
Run Code Online (Sandbox Code Playgroud)
当这个过程被实例化时,消费者永远不会收到消息
如果我刷新生产者并更改 linger_ms 参数(使其同步),则消息将由消费者发送和读取:
class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092', linger_ms=10)
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))
producer.flush()
Run Code Online (Sandbox Code Playgroud)
在之前的 Kafka 版本中,有参数 queue.buffering.max.ms 来指定生产者将等待多长时间才能将消息发送到队列中,但在最新版本(kafka-python 1.3.3)中不存在。我如何在较新的 Kafka 版本中指定它以保持我的通信异步?
谢谢!
con当我尝试从我的主题接收消息时,下面定义的 kafka 消费者工作得非常好;但是,当我尝试使用seek方法或其任何变体更改偏移量时,这给我带来了麻烦。即seek_to_beginning,seek_to_end
from kafka import KafkaConsumer, TopicPartition
con = KafkaConsumer(my_topic, bootstrap_servers = my_bootstrapservers, group_id = my_groupid)
p = con.partitions_for_topic(my_topic)
my_partition = p.pop()
tp = TopicPartition(topic = my_topic, partition = my_partition)
print ('*** tp: ', tp)
con.seek_to_beginning(tp)
Run Code Online (Sandbox Code Playgroud)
它生成以下输出和以下错误:
*** tp: TopicPartition(topic='mytopic', partition=0)
File "/myhome/anaconda3/lib/python3.6/site-packages/kafka/consumer/group.py", line 735, in seek_to_beginning
assert p in self._subscription.assigned_partitions(), 'Unassigned partition'
AssertionError: Unassigned partition
Run Code Online (Sandbox Code Playgroud)
“未分配的分区”错误对我来说似乎无效,因为我只是从使用者本身获取分区和 TopicPartition 并将其传递回搜索方法,因此它确实被分配了。有任何想法吗?
我有一个消费者订阅了一个测试主题,其中生产者线程定期发布。我希望能够阻塞消费者线程,直到出现新消息 - 然后处理该消息并再次开始等待。我最接近的是:
consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
bootstrap_servers=[localhost_],
api_version=(0, 10), consumer_timeout_ms=1000)
while True:
print(consumer.poll(timeout_ms=5000))
Run Code Online (Sandbox Code Playgroud)
有没有更惯用的方法(或者这种方法有什么我看不到的严重问题)?
卡夫卡新手,因此非常欢迎对此设计的一般建议。完整(运行)示例:
import time
from threading import Thread
import kafka
from kafka import KafkaProducer, KafkaConsumer
print('python-kafka:', kafka.__version__)
def publish_message(producer_instance, topic_name, key, value):
try:
key_bytes = bytes(str(key), encoding='utf-8')
value_bytes = bytes(str(value), encoding='utf-8')
producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
producer_instance.flush()
except Exception as ex:
print('Exception in publishing message\n', ex)
localhost_ = 'localhost:9092'
def kafka_producer():
_producer = None
try:
_producer = KafkaProducer(bootstrap_servers=[localhost_],
api_version=(0, 10))
except Exception as ex:
print('Exception while connecting Kafka') …Run Code Online (Sandbox Code Playgroud) 我是 Python 新手,从 Kafka 开始。我有一个需要发送和使用 json 消息的要求。为此,我使用kafka-python与 Kafka 进行通信。
#Producer.py
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('offering_new', {"dataObjectID": "test1"})
#Consumer.py
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',auto_offset_reset='earliest', value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['offering_new'])
for message in consumer :
print(message)
Run Code Online (Sandbox Code Playgroud)
但是,我在消费者上遇到以下异常:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1111, in __next__
return next(self._iterator)
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in _message_generator
for msg in self._fetcher:
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 482, …Run Code Online (Sandbox Code Playgroud) $ docker run 34a24a8a745e
Traceback (most recent call last):
File "//producer.py", line 1, in <module>
from kafka.producer import KafkaProducer
File "/usr/local/lib/python3.9/site-packages/kafka/__init__.py", line 23, in <module>
from kafka.producer import KafkaProducer
File "/usr/local/lib/python3.9/site-packages/kafka/producer/__init__.py", line 4, in <module>
from .simple import SimpleProducer
File "/usr/local/lib/python3.9/site-packages/kafka/producer/simple.py", line 54
return '<SimpleProducer batch=%s>' % self.async
^
SyntaxError: invalid syntax
Run Code Online (Sandbox Code Playgroud)
我曾看到过一个相同的早期错误,但我正在使用 from kafka import KafkaProducer
我的新 python 脚本使用from kafka.producer import KafkaProducer但我仍然收到 self.async 错误
当我尝试时 pip install confluent-kafka出现以下错误
#include <librdkafka/rdkafka.h>
^~~~~~~~~~~~~~~~~~~~~~
1 error generated.
error: command '/usr/bin/gcc' failed with exit code 1
Run Code Online (Sandbox Code Playgroud)
我正在使用 python 版本 3.9 和 macOs Monterey
我打算跳过主题的开头,只读取从某个时间戳到结尾的消息。关于如何实现这一目标的任何提示?
kafka-python ×10
apache-kafka ×7
python ×7
python-3.x ×4
alpine-linux ×1
django ×1
docker ×1
macos ×1
seek ×1