标签: kafka-python

如何在程序中停止Python Kafka Consumer?

我正在做Python Kafka使用者(试图在http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html中使用kafka.consumer.SimpleConsumer或kafka.consumer.simple.SimpleConsumer ).当我运行以下代码时,即使消耗了所有消息,它也会一直运行.我希望消费者在消费所有消息时都会停止.怎么做?另外我不知道如何使用stop()函数(在基类kafka.consumer.base.Consumer中).

UPDATE

我使用信号处理程序来调用consumer.stop().一些错误消息被打印到屏幕上.但程序仍停留在for循环中.当新消息进入时,消费者消费并打印它们.我也尝试过client.close().但结果相同.

我需要一些方法来优雅地停止for循环.

        client = KafkaClient("localhost:9092")
        consumer = SimpleConsumer(client, "test-group", "test")

        consumer.seek(0, 2)# (0,2) and (0,0)

        for message in consumer:
            print "Offset:", message.offset
            print "Value:", message.message.value
Run Code Online (Sandbox Code Playgroud)

欢迎任何帮助.谢谢.

python apache-kafka kafka-consumer-api kafka-python

7
推荐指数
2
解决办法
5059
查看次数

Kafka python使用者启动时阅读所有消息

我正在使用下面的代码来阅读主题的消息。我面临两个问题。每当我启动消费者时,它正在读取队列中的所有消息?如何只阅读未读邮件?

from kafka import KafkaConsumer


consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
for message in consumer:
    consumer.commit() 
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))
Run Code Online (Sandbox Code Playgroud)

python apache-kafka kafka-python

7
推荐指数
1
解决办法
4687
查看次数

如何正确使用pyspark向kafka经纪人发送数据?

我正在尝试编写一个简单的pyspark作业,它将从kafka代理主题接收数据,对该数据进行一些转换,并将转换后的数据放在不同的kafka代理主题上.

我有以下代码,它从kafka主题读取数据,但没有影响运行sendkafka函数:

from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient

def sendkafka(messages):
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka)
    for message in messages:
        yield producer.send_messages('spark.out', message)

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 5)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    parsed = kvs.map(lambda (key, value): json.loads(value))
    parsed.pprint()

    sentRDD = kvs.mapPartitions(sendkafka)
    sentRDD.count()

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main() …
Run Code Online (Sandbox Code Playgroud)

python-2.7 spark-streaming pyspark kafka-python

7
推荐指数
1
解决办法
7904
查看次数

Kafka最佳保留和删除政策

我对卡夫卡很新,所以请原谅我这个问题是微不足道的.我有一个非常简单的设置,用于时序测试,如下所示:

机器A - >写入主题1(代理) - >机器B从主题1读取机器B - >将消息写入主题2(代理) - >机器A从主题2读取

现在我在无限循环中发送大约1400字节的消息,很快就填满了我的小经纪人的空间.我正在尝试为log.retention.ms,log.retention.bytes,log.segment.bytes和log.segment.delete.delay.ms设置不同的值.首先,我将所有值设置为允许的最小值,但似乎这降低了性能,然后我将它们设置为我的代理在完全填满之前可以采取的最大值,但是当删除发生时性能再次下降.是否有最佳实践来设置这些值以获得绝对最小延迟?

谢谢您的帮助!

apache-kafka kafka-consumer-api kafka-python kafka-producer-api

7
推荐指数
1
解决办法
1万
查看次数

如何确定Kafka的API版本?

我正在使用kafka-python访问 Kafka。我尝试创建一个 Kafka Producer:

kafka_producer = KafkaProducer(bootstrap_servers=['kafka:9092'])
Run Code Online (Sandbox Code Playgroud)

但这会因kafka.errors.NoBrokersAvailable: NoBrokersAvailable异常而失败。

我发现我需要向api_versionKafkaProducer添加参数:

kafka_producer = KafkaProducer(bootstrap_servers=['kafka:9092'],
                               api_version=(0, 10, 1))
Run Code Online (Sandbox Code Playgroud)

此命令有效。

问题是:如何确定 的值api_version

kafka-broker-api-versions.sh --bootstrap-server localhost:9092给了我一些东西,但我不确定是否有我可以使用的号码。我尝试了随机值api_version=(20, 2, 1),它也有效。

apache-kafka kafka-python kafka-producer-api

7
推荐指数
1
解决办法
6086
查看次数

从 kafka 消费者读取数据时,如何从用于 avro 记录的模式注册表中查找模式 id

我们使用 schema 注册表来存储 schema,消息被序列化到 avro 并推送到 kafka 主题。

想知道,当从消费者读取数据时,如何找到 avro 记录被序列化的 schema id。我们需要此架构 ID 来跟踪是否将新列添加到表中的更改。如果添加或删除新列,架构注册表中将生成新的架构 id,以及如何在消费者中获取该 id。

consumer = KafkaConsumer(bootstrap_servers = conf['BOOTSTRAP_SERVERS'],
                        auto_offset_reset = conf['AUTO_OFFSET'],
                        enable_auto_commit = conf['AUTO_COMMIT'],
                        auto_commit_interval_ms = conf['AUTO_COMMIT_INTERVAL']
                        )
consumer.subscribe(conf['KAFKA_TOPICS'])

for message in consumer:
    print(message.key)
Run Code Online (Sandbox Code Playgroud)

从上面的代码中,message.key 打印该特定记录的键,以及我们如何找到消费者用来反序列化记录的相应模式 ID。

curl -X GET http://localhost:8081/subjects/helpkit_internal.helpkit_support.agents-value/versions/2

{"subject":"helpkit_internal.helpkit_support.agents-value","version":2,"id":33,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"helpkit_internal.helpkit_support.agents\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"user_id\"
Run Code Online (Sandbox Code Playgroud)

我们想要从消费者那里获取 id 值"id":33

请就此提出建议。

python apache-kafka kafka-consumer-api kafka-python confluent-schema-registry

7
推荐指数
1
解决办法
5155
查看次数

kafka-python:以 0 vs inf 秒超时关闭 kafka 生产者

我正在尝试使用 python 2.7 使用 kafka-python 2.0.1 生成 Kafka 主题的消息(由于一些与工作场所相关的限制,无法使用 Python 3)

我在单独的环境中创建了一个类,并编译了该包并安装在虚拟环境中:

import json
from kafka import KafkaProducer


class KafkaSender(object):
    def __init__(self):
        self.producer = self.get_kafka_producer()

    def get_kafka_producer(self):
        return KafkaProducer(
            bootstrap_servers=['locahost:9092'],
            value_serializer=lambda x: json.dumps(x),
            request_timeout_ms=2000,
        )

    def send(self, data):
        self.producer.send("topicname", value=data)
Run Code Online (Sandbox Code Playgroud)

我的驱动程序代码是这样的:

from mypackage import KafkaSender

# driver code
data = {"a":"b"}
kafka_sender = KafkaSender()
kafka_sender.send(data)
Run Code Online (Sandbox Code Playgroud)

场景1:
我运行这段代码,它运行得很好,没有错误,但消息没有推送到主题。我已经确认这一点,因为该主题中的偏移或滞后没有增加。此外,消费者端没有记录任何内容。

场景 2:
从方法中注释/删除 Kafka 生产者的初始化__init__
我将发送行从 更改为 self.producer.send("topicname", value=data) ie self.get_kafka_producer().send("topicname", value=data),不是提前(在类初始化期间)而是在将消息发送到主题之前创建 kafka 生产者。当我运行代码时,它运行得很好。该消息已发布到该主题。

我使用场景 1 的目的是创建一个 Kafka 生产者一次并使用它多次,而不是每次我想发送消息时都创建 Kafka …

python-2.7 kafka-python kafka-producer-api

7
推荐指数
0
解决办法
4994
查看次数

无法使用 Kafka-Python 的反序列化器从 Kafka 消费 JSON 消息

我正在尝试通过 Kafka 发送一个非常简单的 JSON 对象,并使用 Python 和 kafka-python 从另一端读出它。但是,我一直看到以下错误:

2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:ERROR:10620:Error processing callback
Traceback (most recent call last):
  File "C:\Anaconda2\lib\site-packages\kafka\future.py", line 79, in _call_backs
    f(value)
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 760, in _handle_fetch_response
    unpacked = list(self._unpack_message_set(tp, messages))
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 539, in _unpack_message_set
    tp.topic, msg.value)
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 570, in _deserialize
    return f(bytes_)
  File "C:\Users\myUser\workspace\PythonKafkaTest\src\example.py", line 55, in <lambda>
    value_deserializer=lambda m: json.loads(m).decode('utf-8'))
  File "C:\Anaconda2\lib\json\__init__.py", line 339, in loads
    return _default_decoder.decode(s)
  File "C:\Anaconda2\lib\json\decoder.py", line 364, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end()) …
Run Code Online (Sandbox Code Playgroud)

python json kafka-python

6
推荐指数
2
解决办法
7921
查看次数

对于 AvroProducer 到 Kafka,“键”和“值”的 avro 模式在哪里?

confluent-kafka-python repo 中AvroProducer示例来看,键/值模式似乎是从文件中加载的。也就是说,从这段代码:

from confluent_kafka import avro 
from confluent_kafka.avro import AvroProducer

value_schema = avro.load('ValueSchema.avsc')
key_schema = avro.load('KeySchema.avsc')
value = {"name": "Value"}
key = {"name": "Key"}

avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value=value, key=key)
Run Code Online (Sandbox Code Playgroud)

看来这些文件ValueSchema.avscKeySchema.avsc是独立于 Avro Schema Registry 加载的。

这是正确的吗?引用 Avro 架构注册表的 URL,然后从磁盘加载键/值的架构有什么意义?

请说清楚。

apache-kafka kafka-python kafka-producer-api

6
推荐指数
1
解决办法
4070
查看次数

使用消息时 AIOKafka 库中的 UnknownMemberId 错误

我在 Python 中的库 AIOKafka 遇到了错误(版本在最后)。基本上,我收到一条失败的心跳消息,然后无法执行偏移量的提交。这是日志:

Heartbeat failed for group my-group-dag-kafka because it is rebalancing
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 1)for group my-group-dag-kafka.
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error …
Run Code Online (Sandbox Code Playgroud)

python aio apache-kafka kafka-python

6
推荐指数
0
解决办法
633
查看次数