标签: kafka-python

kafka-python中的多处理

我一直在使用python-kaka模块从kafka经纪人那里消费.我希望同时使用'x'个分区的相同主题.文档包含:

# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
 consumer1 = KafkaConsumer('my-topic',
                      group_id='my-group',
                      bootstrap_servers='my.server.com')
  consumer2 = KafkaConsumer('my-topic',
                      group_id='my-group',
                      bootstrap_servers='my.server.com')
Run Code Online (Sandbox Code Playgroud)

这是否意味着我可以为我生成的每个进程创建一个单独的使用者?此外,consumer1和consumer2消费的消息是否会重叠?

谢谢

apache-kafka kafka-consumer-api kafka-python

6
推荐指数
1
解决办法
2488
查看次数

Python:模拟 Kafka 进行集成测试

我对集成测试有点陌生。我有两个使用 Kafka 相互传递消息的服务。但是,对于我的集成测试,我不一定想让 Kafka 运行来运行我的测试。有没有标准的方法来模拟 Kafka?或者这是我需要自己创建的东西,一些 MockKafka 队列和应用程序中适当的补丁?此外,这是否违反了集成测试应该做的事情?我对此的看法是,我没有测试 Kafka 的任何功能,为了集成测试,应该模拟出那里。

python integration-testing apache-kafka kafka-python

6
推荐指数
1
解决办法
1655
查看次数

无法使用 Kafka-Python 的反序列化器从 Kafka 消费 JSON 消息

我正在尝试通过 Kafka 发送一个非常简单的 JSON 对象,并使用 Python 和 kafka-python 从另一端读出它。但是,我一直看到以下错误:

2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:ERROR:10620:Error processing callback
Traceback (most recent call last):
  File "C:\Anaconda2\lib\site-packages\kafka\future.py", line 79, in _call_backs
    f(value)
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 760, in _handle_fetch_response
    unpacked = list(self._unpack_message_set(tp, messages))
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 539, in _unpack_message_set
    tp.topic, msg.value)
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 570, in _deserialize
    return f(bytes_)
  File "C:\Users\myUser\workspace\PythonKafkaTest\src\example.py", line 55, in <lambda>
    value_deserializer=lambda m: json.loads(m).decode('utf-8'))
  File "C:\Anaconda2\lib\json\__init__.py", line 339, in loads
    return _default_decoder.decode(s)
  File "C:\Anaconda2\lib\json\decoder.py", line 364, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end()) …
Run Code Online (Sandbox Code Playgroud)

python json kafka-python

6
推荐指数
2
解决办法
7921
查看次数

对于 AvroProducer 到 Kafka,“键”和“值”的 avro 模式在哪里?

confluent-kafka-python repo 中AvroProducer示例来看,键/值模式似乎是从文件中加载的。也就是说,从这段代码:

from confluent_kafka import avro 
from confluent_kafka.avro import AvroProducer

value_schema = avro.load('ValueSchema.avsc')
key_schema = avro.load('KeySchema.avsc')
value = {"name": "Value"}
key = {"name": "Key"}

avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value=value, key=key)
Run Code Online (Sandbox Code Playgroud)

看来这些文件ValueSchema.avscKeySchema.avsc是独立于 Avro Schema Registry 加载的。

这是正确的吗?引用 Avro 架构注册表的 URL,然后从磁盘加载键/值的架构有什么意义?

请说清楚。

apache-kafka kafka-python kafka-producer-api

6
推荐指数
1
解决办法
4070
查看次数

如何从 confluence_python AVRO 消费者获取最新的偏移值

我对 confluence_kafka 还很陌生,但我已经获得了一些使用 kafka-python 的经验。我想做的是改变开始消费消息的偏移量。这就是为什么我想构建一个能够返回到以前的消息的消费者客户端,以便返回将填充仪表板的数据。说使用kafka-python包我可以使用seek_to_endhttps://github.com/dpkp/kafka-python/blob/c0fddbd24269d4333e3b6630a23e86ffe33dfcb6/kafka/consumer/group.py#L788)方法来获取位置值最新的提交。这样我就可以使用该seek方法减去值并返回到之前的消息(https://github.com/dpkp/kafka-python/blob/c0fddbd24269d4333e3b6630a23e86ffe33dfcb6/kafka/consumer/group.py#L738

另一方面,conflient_kafka似乎没有类似的功能,到目前为止我发现的是使用变量OFFSET_END,其值为-1,并且它不会返回最新和最大的偏移数值一。我也可以使用“seek”函数,但我需要一种方法来获取最新偏移量的数值,而不是-1.

我的 avro 消费者看起来像

from confluent_kafka.avro import AvroConsumer

if __name__ == '__main__':
     c = AvroConsumer({"bootstrap.servers": "locahost:29092", "group.id":"mygroup",'schema.registry.url': 'http://localhost:8081',
                  'enable.auto.commit': True,'default.topic.config': {'auto.offset.reset': 'smallest'}})

def my_assign (consumer, partitions):
    for p in partitions:
        p.offset = confluent_kafka.OFFSET_END
        print("offset=",p.offset)
    print('assign', partitions)
    print('position:',consumer.position(partitions))
    consumer.assign(partitions)

c.subscribe(["mytopic"],on_assign=my_assign)

while True:
    m = c.poll(1)
    if m is None:
        continue

    if m.error() is None:
        print('Received message', m.value(),m.offset())
c.close()
Run Code Online (Sandbox Code Playgroud)

产生以下结果:

offset= -1
assign [TopicPartition{topic=mytopic,partition=0,offset=-1,error=None}]
position: [TopicPartition{topic=mytopic,partition=0,offset=-1001,error=None}]
Run Code Online (Sandbox Code Playgroud)

并等待下一条消息。我想知道是否有人可以帮助我。谢谢

python kafka-python confluent-platform

6
推荐指数
1
解决办法
7958
查看次数

在Python中指示group_id时,Kafka未收到消息

我使用的是 Kafka ( kafka-python) 版本 3.0.0-1.3.0.0.p0.40。我需要在 Python 中为“模拟”主题配置使用者。当我不指示 group_id 时,即 group_id = None ,它可以正常接收消息。但是,如果我指示 group_id,它不会收到任何消息。

这是我的 Python 代码:

consumer = KafkaConsumer(bootstrap_servers='XXX.XXX.XXX.XXX:9092',
                         group_id = 'myTestGroupID', enable_auto_commit = True)
consumer.subscribe(['simulation'])
# not using assign method here as auto_commit is enabled
# partitions = [TopicPartition('simulation',num) for num in range(0,9)]
# consumer.assign([TopicPartition('simulation', partitions[0])])

while not self.stop_event.is_set():
    for message in consumer:
        print(message)
Run Code Online (Sandbox Code Playgroud)

我尝试在消费者属性文件中搜索 group_id 的一些默认值,我发现了一个 cloudera_mirrormaker 但没有任何改变。我需要使用多个消费者,因此拥有一个 group_id 并且它们共享相同的 group_id 很重要。在许多来源中,我发现 group_id 可以是任何字符串......

当我在控制台中运行该主题的使用者时,它可以工作并接收消息

./kafka-console-consumer.sh --bootstrap-server XXX.XXX.XXX.XXX:9092 --topic simulation --from-beginning --consumer-property group.id=myTestGroupID  --partition 0 …
Run Code Online (Sandbox Code Playgroud)

python consumer apache-kafka kafka-consumer-api kafka-python

6
推荐指数
1
解决办法
4993
查看次数

kafka-python 引发 UnrecognizedBrokerVersion 错误

使用kafka-python包构建 KafkaProducer 时出现此错误:

[ERROR] UnrecognizedBrokerVersion: UnrecognizedBrokerVersion
Traceback (most recent call last):
  File "/var/lang/lib/python3.7/imp.py", line 234, in load_module
    return load_source(name, filename, file)
  File "/var/lang/lib/python3.7/imp.py", line 171, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 696, in _load
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/var/task/kafka/producer/kafka.py", line 381, in __init__
    **self.config)
  File "/var/task/kafka/client_async.py", line 240, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/var/task/kafka/client_async.py", line 908, …
Run Code Online (Sandbox Code Playgroud)

python apache-kafka kafka-python aws-msk

6
推荐指数
1
解决办法
1万
查看次数

使用消息时 AIOKafka 库中的 UnknownMemberId 错误

我在 Python 中的库 AIOKafka 遇到了错误(版本在最后)。基本上,我收到一条失败的心跳消息,然后无法执行偏移量的提交。这是日志:

Heartbeat failed for group my-group-dag-kafka because it is rebalancing
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 1)for group my-group-dag-kafka.
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error …
Run Code Online (Sandbox Code Playgroud)

python aio apache-kafka kafka-python

6
推荐指数
0
解决办法
633
查看次数

confluence-kafka-python:当 Broker 不可用时如何使初始连接超时?

我正在使用confluent-kafka-python,当我尝试连接到已关闭的代理时,发现它无限挂起。我似乎无法应用在文档中找到的任何超时设置:

from confluent_kafka import Consumer

conf = {'bootstrap.servers': f"{self.host}:{self.port}",
           'group.id': "foo",
           'auto.offset.reset': 'smallest',
           'socket.timeout.ms':'2000', 'socket.max.fails':2,
           'metadata.request.timeout.ms': 5000,
           'reconnect.backoff.max.ms':'5000',
           'api.version.request.timeout.ms':'5000',
           #api.version.fallback.ms
           'session.timeout.ms':'2000',
           #heartbeat.interval.ms
           'coordinator.query.interval.ms':'1000',
           #max.poll.interval.ms
           #auto.commit.interval.ms,
           "debug":"generic, broker, topic, metadata",

   }

try:
     self.consumer = Consumer(conf)
Run Code Online (Sandbox Code Playgroud)

我在日志中得到:

%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: broker in state TRY_CONNECT connecting
%7|1584702589.065|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1584702589.065|BROADCAST|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: Broadcasting state change
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Connecting to ipv4#x.x.x.x:6667 (plaintext) with socket 11
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:app]: Cluster connection already in progress: application metadata request
%7|1584702589.066|CONNECT|rdkafka#consumer-1| …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api kafka-python confluent-platform

6
推荐指数
0
解决办法
3770
查看次数

如何使用kafka-python计算主题中的记录(消息)数量

正如标题中所说,我想在我的主题中获得一些记录,但我找不到使用 kafka-python 库的解决方案。有人有什么主意吗 ?

python apache-kafka kafka-python

6
推荐指数
1
解决办法
1万
查看次数