我正在做Python Kafka使用者(试图在http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html中使用kafka.consumer.SimpleConsumer或kafka.consumer.simple.SimpleConsumer ).当我运行以下代码时,即使消耗了所有消息,它也会一直运行.我希望消费者在消费所有消息时都会停止.怎么做?另外我不知道如何使用stop()函数(在基类kafka.consumer.base.Consumer中).
UPDATE
我使用信号处理程序来调用consumer.stop().一些错误消息被打印到屏幕上.但程序仍停留在for循环中.当新消息进入时,消费者消费并打印它们.我也尝试过client.close().但结果相同.
我需要一些方法来优雅地停止for循环.
client = KafkaClient("localhost:9092")
consumer = SimpleConsumer(client, "test-group", "test")
consumer.seek(0, 2)# (0,2) and (0,0)
for message in consumer:
print "Offset:", message.offset
print "Value:", message.message.value
Run Code Online (Sandbox Code Playgroud)
欢迎任何帮助.谢谢.
我正在使用下面的代码来阅读主题的消息。我面临两个问题。每当我启动消费者时,它正在读取队列中的所有消息?如何只阅读未读邮件?
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
consumer.commit()
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
Run Code Online (Sandbox Code Playgroud) 我正在尝试编写一个简单的pyspark作业,它将从kafka代理主题接收数据,对该数据进行一些转换,并将转换后的数据放在不同的kafka代理主题上.
我有以下代码,它从kafka主题读取数据,但没有影响运行sendkafka函数:
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
def sendkafka(messages):
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
for message in messages:
yield producer.send_messages('spark.out', message)
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 5)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
parsed = kvs.map(lambda (key, value): json.loads(value))
parsed.pprint()
sentRDD = kvs.mapPartitions(sendkafka)
sentRDD.count()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main() …Run Code Online (Sandbox Code Playgroud) 我对卡夫卡很新,所以请原谅我这个问题是微不足道的.我有一个非常简单的设置,用于时序测试,如下所示:
机器A - >写入主题1(代理) - >机器B从主题1读取机器B - >将消息写入主题2(代理) - >机器A从主题2读取
现在我在无限循环中发送大约1400字节的消息,很快就填满了我的小经纪人的空间.我正在尝试为log.retention.ms,log.retention.bytes,log.segment.bytes和log.segment.delete.delay.ms设置不同的值.首先,我将所有值设置为允许的最小值,但似乎这降低了性能,然后我将它们设置为我的代理在完全填满之前可以采取的最大值,但是当删除发生时性能再次下降.是否有最佳实践来设置这些值以获得绝对最小延迟?
谢谢您的帮助!
apache-kafka kafka-consumer-api kafka-python kafka-producer-api
我正在使用kafka-python访问 Kafka。我尝试创建一个 Kafka Producer:
kafka_producer = KafkaProducer(bootstrap_servers=['kafka:9092'])
Run Code Online (Sandbox Code Playgroud)
但这会因kafka.errors.NoBrokersAvailable: NoBrokersAvailable异常而失败。
我发现我需要向api_versionKafkaProducer添加参数:
kafka_producer = KafkaProducer(bootstrap_servers=['kafka:9092'],
api_version=(0, 10, 1))
Run Code Online (Sandbox Code Playgroud)
此命令有效。
问题是:如何确定 的值api_version?
kafka-broker-api-versions.sh --bootstrap-server localhost:9092给了我一些东西,但我不确定是否有我可以使用的号码。我尝试了随机值api_version=(20, 2, 1),它也有效。
我们使用 schema 注册表来存储 schema,消息被序列化到 avro 并推送到 kafka 主题。
想知道,当从消费者读取数据时,如何找到 avro 记录被序列化的 schema id。我们需要此架构 ID 来跟踪是否将新列添加到表中的更改。如果添加或删除新列,架构注册表中将生成新的架构 id,以及如何在消费者中获取该 id。
consumer = KafkaConsumer(bootstrap_servers = conf['BOOTSTRAP_SERVERS'],
auto_offset_reset = conf['AUTO_OFFSET'],
enable_auto_commit = conf['AUTO_COMMIT'],
auto_commit_interval_ms = conf['AUTO_COMMIT_INTERVAL']
)
consumer.subscribe(conf['KAFKA_TOPICS'])
for message in consumer:
print(message.key)
Run Code Online (Sandbox Code Playgroud)
从上面的代码中,message.key 打印该特定记录的键,以及我们如何找到消费者用来反序列化记录的相应模式 ID。
curl -X GET http://localhost:8081/subjects/helpkit_internal.helpkit_support.agents-value/versions/2
{"subject":"helpkit_internal.helpkit_support.agents-value","version":2,"id":33,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"helpkit_internal.helpkit_support.agents\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"user_id\"
Run Code Online (Sandbox Code Playgroud)
我们想要从消费者那里获取 id 值"id":33
请就此提出建议。
python apache-kafka kafka-consumer-api kafka-python confluent-schema-registry
我正在尝试使用 python 2.7 使用 kafka-python 2.0.1 生成 Kafka 主题的消息(由于一些与工作场所相关的限制,无法使用 Python 3)
我在单独的环境中创建了一个类,并编译了该包并安装在虚拟环境中:
import json
from kafka import KafkaProducer
class KafkaSender(object):
def __init__(self):
self.producer = self.get_kafka_producer()
def get_kafka_producer(self):
return KafkaProducer(
bootstrap_servers=['locahost:9092'],
value_serializer=lambda x: json.dumps(x),
request_timeout_ms=2000,
)
def send(self, data):
self.producer.send("topicname", value=data)
Run Code Online (Sandbox Code Playgroud)
我的驱动程序代码是这样的:
from mypackage import KafkaSender
# driver code
data = {"a":"b"}
kafka_sender = KafkaSender()
kafka_sender.send(data)
Run Code Online (Sandbox Code Playgroud)
场景1:
我运行这段代码,它运行得很好,没有错误,但消息没有推送到主题。我已经确认这一点,因为该主题中的偏移或滞后没有增加。此外,消费者端没有记录任何内容。
场景 2:
从方法中注释/删除 Kafka 生产者的初始化__init__。
我将发送行从 更改为
self.producer.send("topicname", value=data) ie self.get_kafka_producer().send("topicname", value=data),不是提前(在类初始化期间)而是在将消息发送到主题之前创建 kafka 生产者。当我运行代码时,它运行得很好。该消息已发布到该主题。
我使用场景 1 的目的是创建一个 Kafka 生产者一次并使用它多次,而不是每次我想发送消息时都创建 Kafka …
我正在尝试通过 Kafka 发送一个非常简单的 JSON 对象,并使用 Python 和 kafka-python 从另一端读出它。但是,我一直看到以下错误:
2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:ERROR:10620:Error processing callback
Traceback (most recent call last):
File "C:\Anaconda2\lib\site-packages\kafka\future.py", line 79, in _call_backs
f(value)
File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 760, in _handle_fetch_response
unpacked = list(self._unpack_message_set(tp, messages))
File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 539, in _unpack_message_set
tp.topic, msg.value)
File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 570, in _deserialize
return f(bytes_)
File "C:\Users\myUser\workspace\PythonKafkaTest\src\example.py", line 55, in <lambda>
value_deserializer=lambda m: json.loads(m).decode('utf-8'))
File "C:\Anaconda2\lib\json\__init__.py", line 339, in loads
return _default_decoder.decode(s)
File "C:\Anaconda2\lib\json\decoder.py", line 364, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end()) …Run Code Online (Sandbox Code Playgroud) 从confluent-kafka-python repo 中的AvroProducer示例来看,键/值模式似乎是从文件中加载的。也就是说,从这段代码:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema = avro.load('ValueSchema.avsc')
key_schema = avro.load('KeySchema.avsc')
value = {"name": "Value"}
key = {"name": "Key"}
avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value=value, key=key)
Run Code Online (Sandbox Code Playgroud)
看来这些文件ValueSchema.avsc和KeySchema.avsc是独立于 Avro Schema Registry 加载的。
这是正确的吗?引用 Avro 架构注册表的 URL,然后从磁盘加载键/值的架构有什么意义?
请说清楚。
我在 Python 中的库 AIOKafka 遇到了错误(版本在最后)。基本上,我收到一条失败的心跳消息,然后无法执行偏移量的提交。这是日志:
Heartbeat failed for group my-group-dag-kafka because it is rebalancing
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 1)for group my-group-dag-kafka.
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error …Run Code Online (Sandbox Code Playgroud)