我一直在使用python-kaka模块从kafka经纪人那里消费.我希望同时使用'x'个分区的相同主题.文档包含:
# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
consumer1 = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers='my.server.com')
Run Code Online (Sandbox Code Playgroud)
这是否意味着我可以为我生成的每个进程创建一个单独的使用者?此外,consumer1和consumer2消费的消息是否会重叠?
谢谢
我对集成测试有点陌生。我有两个使用 Kafka 相互传递消息的服务。但是,对于我的集成测试,我不一定想让 Kafka 运行来运行我的测试。有没有标准的方法来模拟 Kafka?或者这是我需要自己创建的东西,一些 MockKafka 队列和应用程序中适当的补丁?此外,这是否违反了集成测试应该做的事情?我对此的看法是,我没有测试 Kafka 的任何功能,为了集成测试,应该模拟出那里。
我正在尝试通过 Kafka 发送一个非常简单的 JSON 对象,并使用 Python 和 kafka-python 从另一端读出它。但是,我一直看到以下错误:
2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:ERROR:10620:Error processing callback
Traceback (most recent call last):
File "C:\Anaconda2\lib\site-packages\kafka\future.py", line 79, in _call_backs
f(value)
File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 760, in _handle_fetch_response
unpacked = list(self._unpack_message_set(tp, messages))
File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 539, in _unpack_message_set
tp.topic, msg.value)
File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 570, in _deserialize
return f(bytes_)
File "C:\Users\myUser\workspace\PythonKafkaTest\src\example.py", line 55, in <lambda>
value_deserializer=lambda m: json.loads(m).decode('utf-8'))
File "C:\Anaconda2\lib\json\__init__.py", line 339, in loads
return _default_decoder.decode(s)
File "C:\Anaconda2\lib\json\decoder.py", line 364, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end()) …Run Code Online (Sandbox Code Playgroud) 从confluent-kafka-python repo 中的AvroProducer示例来看,键/值模式似乎是从文件中加载的。也就是说,从这段代码:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema = avro.load('ValueSchema.avsc')
key_schema = avro.load('KeySchema.avsc')
value = {"name": "Value"}
key = {"name": "Key"}
avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value=value, key=key)
Run Code Online (Sandbox Code Playgroud)
看来这些文件ValueSchema.avsc和KeySchema.avsc是独立于 Avro Schema Registry 加载的。
这是正确的吗?引用 Avro 架构注册表的 URL,然后从磁盘加载键/值的架构有什么意义?
请说清楚。
我对 confluence_kafka 还很陌生,但我已经获得了一些使用 kafka-python 的经验。我想做的是改变开始消费消息的偏移量。这就是为什么我想构建一个能够返回到以前的消息的消费者客户端,以便返回将填充仪表板的数据。说使用kafka-python包我可以使用seek_to_end (https://github.com/dpkp/kafka-python/blob/c0fddbd24269d4333e3b6630a23e86ffe33dfcb6/kafka/consumer/group.py#L788)方法来获取位置值最新的提交。这样我就可以使用该seek方法减去值并返回到之前的消息(https://github.com/dpkp/kafka-python/blob/c0fddbd24269d4333e3b6630a23e86ffe33dfcb6/kafka/consumer/group.py#L738)
另一方面,conflient_kafka似乎没有类似的功能,到目前为止我发现的是使用变量OFFSET_END,其值为-1,并且它不会返回最新和最大的偏移数值一。我也可以使用“seek”函数,但我需要一种方法来获取最新偏移量的数值,而不是-1.
我的 avro 消费者看起来像
from confluent_kafka.avro import AvroConsumer
if __name__ == '__main__':
c = AvroConsumer({"bootstrap.servers": "locahost:29092", "group.id":"mygroup",'schema.registry.url': 'http://localhost:8081',
'enable.auto.commit': True,'default.topic.config': {'auto.offset.reset': 'smallest'}})
def my_assign (consumer, partitions):
for p in partitions:
p.offset = confluent_kafka.OFFSET_END
print("offset=",p.offset)
print('assign', partitions)
print('position:',consumer.position(partitions))
consumer.assign(partitions)
c.subscribe(["mytopic"],on_assign=my_assign)
while True:
m = c.poll(1)
if m is None:
continue
if m.error() is None:
print('Received message', m.value(),m.offset())
c.close()
Run Code Online (Sandbox Code Playgroud)
产生以下结果:
offset= -1
assign [TopicPartition{topic=mytopic,partition=0,offset=-1,error=None}]
position: [TopicPartition{topic=mytopic,partition=0,offset=-1001,error=None}]
Run Code Online (Sandbox Code Playgroud)
并等待下一条消息。我想知道是否有人可以帮助我。谢谢
我使用的是 Kafka ( kafka-python) 版本 3.0.0-1.3.0.0.p0.40。我需要在 Python 中为“模拟”主题配置使用者。当我不指示 group_id 时,即 group_id = None ,它可以正常接收消息。但是,如果我指示 group_id,它不会收到任何消息。
这是我的 Python 代码:
consumer = KafkaConsumer(bootstrap_servers='XXX.XXX.XXX.XXX:9092',
group_id = 'myTestGroupID', enable_auto_commit = True)
consumer.subscribe(['simulation'])
# not using assign method here as auto_commit is enabled
# partitions = [TopicPartition('simulation',num) for num in range(0,9)]
# consumer.assign([TopicPartition('simulation', partitions[0])])
while not self.stop_event.is_set():
for message in consumer:
print(message)
Run Code Online (Sandbox Code Playgroud)
我尝试在消费者属性文件中搜索 group_id 的一些默认值,我发现了一个 cloudera_mirrormaker 但没有任何改变。我需要使用多个消费者,因此拥有一个 group_id 并且它们共享相同的 group_id 很重要。在许多来源中,我发现 group_id 可以是任何字符串......
当我在控制台中运行该主题的使用者时,它可以工作并接收消息
./kafka-console-consumer.sh --bootstrap-server XXX.XXX.XXX.XXX:9092 --topic simulation --from-beginning --consumer-property group.id=myTestGroupID --partition 0 …Run Code Online (Sandbox Code Playgroud) python consumer apache-kafka kafka-consumer-api kafka-python
使用kafka-python包构建 KafkaProducer 时出现此错误:
[ERROR] UnrecognizedBrokerVersion: UnrecognizedBrokerVersion
Traceback (most recent call last):
File "/var/lang/lib/python3.7/imp.py", line 234, in load_module
return load_source(name, filename, file)
File "/var/lang/lib/python3.7/imp.py", line 171, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 696, in _load
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/var/task/kafka/producer/kafka.py", line 381, in __init__
**self.config)
File "/var/task/kafka/client_async.py", line 240, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/var/task/kafka/client_async.py", line 908, …Run Code Online (Sandbox Code Playgroud) 我在 Python 中的库 AIOKafka 遇到了错误(版本在最后)。基本上,我收到一条失败的心跳消息,然后无法执行偏移量的提交。这是日志:
Heartbeat failed for group my-group-dag-kafka because it is rebalancing
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 1)for group my-group-dag-kafka.
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error …Run Code Online (Sandbox Code Playgroud) 我正在使用confluent-kafka-python,当我尝试连接到已关闭的代理时,发现它无限挂起。我似乎无法应用在文档中找到的任何超时设置:
from confluent_kafka import Consumer
conf = {'bootstrap.servers': f"{self.host}:{self.port}",
'group.id': "foo",
'auto.offset.reset': 'smallest',
'socket.timeout.ms':'2000', 'socket.max.fails':2,
'metadata.request.timeout.ms': 5000,
'reconnect.backoff.max.ms':'5000',
'api.version.request.timeout.ms':'5000',
#api.version.fallback.ms
'session.timeout.ms':'2000',
#heartbeat.interval.ms
'coordinator.query.interval.ms':'1000',
#max.poll.interval.ms
#auto.commit.interval.ms,
"debug":"generic, broker, topic, metadata",
}
try:
self.consumer = Consumer(conf)
Run Code Online (Sandbox Code Playgroud)
我在日志中得到:
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: broker in state TRY_CONNECT connecting
%7|1584702589.065|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1584702589.065|BROADCAST|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: Broadcasting state change
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Connecting to ipv4#x.x.x.x:6667 (plaintext) with socket 11
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:app]: Cluster connection already in progress: application metadata request
%7|1584702589.066|CONNECT|rdkafka#consumer-1| …Run Code Online (Sandbox Code Playgroud) apache-kafka kafka-consumer-api kafka-python confluent-platform
正如标题中所说,我想在我的主题中获得一些记录,但我找不到使用 kafka-python 库的解决方案。有人有什么主意吗 ?