标签: kafka-python

Zookeeper -Kafka:ConnectException - 连接被拒绝

我正在尝试在ubuntu EC2机器上设置 3 个 Kafka 代理。但我ConnectException在开始时就得到了zookeepersecurity group我的实例中的所有端口ec2都已打开。

下面是堆栈跟踪:

[2016-03-03 07:37:12,040] ERROR Exception while listening (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.BindException: Cannot assign requested address
    at java.net.PlainSocketImpl.socketBind(Native Method)
    at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376)
    at java.net.ServerSocket.bind(ServerSocket.java:376)
    at java.net.ServerSocket.bind(ServerSocket.java:330)
at org.apache.zookeeper.server.quorum.QuorumCnxManager$Listener.run(QuorumCnxManager.java:507)
...
...
...

[2016-03-03 07:23:46,093] WARN Cannot open channel to 2 at election address /52.36.XXX.181:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
    java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
    at …
Run Code Online (Sandbox Code Playgroud)

amazon-ec2 amazon-web-services apache-kafka ubuntu-14.04 kafka-python

2
推荐指数
1
解决办法
3763
查看次数

使用 Python 读取特定的 Kafka 主题

我的主题有 3 个分区,我尝试使用以下代码从每个特定分区读取内容

from kafka import KafkaConsumer, TopicPartition

brokers = 'localhost:9092'
topic = 'b3'

m = KafkaConsumer(topic, bootstrap_servers=['localhost:9092'])
par = TopicPartition(topic=topic, partition=1)
m.assign(par)
Run Code Online (Sandbox Code Playgroud)

但我收到此错误:

    raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
kafka.errors.IllegalStateError: IllegalStateError: You must choose only one way to configure your consumer: (1) subscribe to specific topics by name, (2) subscribe to topics matching a regex pattern, (3) assign itself specific topic-partitions.
Run Code Online (Sandbox Code Playgroud)

有人可以帮我解决这个问题吗?

python apache-kafka kafka-python

2
推荐指数
1
解决办法
5137
查看次数

Kafka python 消费者在并行线程中运行

我是 python 和 kafka 的新手。我有一个脚本应该启动三个 kafka 消费者,等待来自这些消费者的消息并执行一些其他操作。此时我什至不知道我是否朝着正确的方向前进,所以任何帮助将不胜感激。

class MainClass():
    def do_something_before(self):
        # something is done here

    def start_consumer(self):
        consumer1_thread = threading.Thread(target=self.cons1, args=())
        consumer2_thread = threading.Thread(target=self.cons2, args=())
        consumer1_thread.daemon = True
        consumer2_thread.daemon = True
        consumer1_thread.start()
        consumer2_thread.start()

    def cons1(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest')
        consumer.subscribe(['my-topic'])
        for message in consumer:
            print(message.value)

    def cons2(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest')
        consumer.subscribe(['my2-topic'])
        for message in consumer:
            print(message.value)

    def keep_working(self):
        # something is done here

if __name__ == 'main':
    g = MainClass()
    g.do_something_before()
    g.keep_working()
Run Code Online (Sandbox Code Playgroud)

python apache-kafka kafka-python

2
推荐指数
1
解决办法
1万
查看次数

kafka-python使用者从偏移量开始读取(自动)

我正在尝试使用kafka-python构建一个应用程序,其中消费者从一系列主题中读取数据.非常重要的是,消费者永远不会两次读取相同的消息,但也永远不会错过消息.

一切似乎工作正常,除非我关闭消费者(例如失败)并尝试从偏移开始阅读.我只能读取主题中的所有消息(创建双读)或仅侦听新消息(并且错过在故障期间发出的消息).暂停消费者时我没有遇到这个问题.

我创建了一个孤立的模拟,以试图解决问题.

这里是通用生产者:

from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

x=0 # set manually to avoid duplicates 

for e in range(1000):
    if e <= x:
        pass
    else:
        data = dumps(
            {
            'number' : e
        }
        ).encode('utf-8')

        producer.send('numtest', value=data)
        print(e, ' send.')

        sleep(5)
Run Code Online (Sandbox Code Playgroud)

和消费者.如果auto_offset_reset设置为'earliest',则将再次读取所有消息.如果auto_offset_reset设置为'latest',则不会读取停机期间的消息.

from kafka import KafkaConsumer
from pymongo import MongoClient
from json import loads

## Retrieve …
Run Code Online (Sandbox Code Playgroud)

offset python-3.x apache-kafka kafka-consumer-api kafka-python

2
推荐指数
1
解决办法
5434
查看次数

如果Kafka中不存在主题,如何使用kafka-python动态创建主题

我对 Python 相当陌生,刚刚开始使用 Kafka。我正在使用名为python-kafka 的库与 Kafka 进行通信。现在我有一个要求,我需要动态创建主题,但是如果它确实存在,我不需要创建它。

通过阅读文档,我发现我可以使用KafkaAdminClient创建和删除主题,但是我没有找到任何方法来检查主题是否存在。

python apache-kafka kafka-python

2
推荐指数
1
解决办法
5263
查看次数

如何使用 kafka-python 以编程方式创建主题?

我刚开始使用 Kafka 并且对 Python 相当陌生。我正在使用这个名为的库kafka-python与我的 Kafka 代理进行通信。现在我需要从我的代码动态创建一个主题,从文档中我看到的是我可以调用create_topics()方法来这样做,但是我不确定,我将如何获得此类的实例。我无法从文档中理解这一点。

有人可以帮我弄这个吗?

python apache-kafka kafka-python kafka-topic

2
推荐指数
1
解决办法
2038
查看次数

处理kafka消息需要很长时间

我有一个 Python 进程(或者更确切地说,在消费者组中并行运行的一组进程),它根据来自某个主题的 Kafka 消息输入来处理数据。通常每条消息都会很快得到处理,但有时,根据消息的内容,可能需要很长时间(几分钟)。在这种情况下,Kafka代理会断开客户端与组的连接并启动重新平衡。我可以设置session_timeout_ms一个非常大的值,但大约需要 10 分钟以上,这意味着如果客户端挂掉,集群将在 10 分钟内无法正确重新平衡。这似乎是一个坏主意。此外,大多数消息(大约 98%)都很快,因此为 1-2% 的消息支付这样的惩罚似乎很浪费。OTOH,大消息足够频繁,足以导致大量重新平衡并消耗大量性能(因为当组重新平衡时,什么也没有完成,然后“死”客户端再次重新加入并导致另一次重新平衡)。

那么,我想知道是否还有其他方法来处理需要很长时间才能处理的消息?有没有办法手动启动心跳来告诉代理“没关系,我还活着,我只是在处理消息”?我认为 Python 客户端(我使用的kafka-python 1.4.7)应该为我做这件事,但它似乎没有发生。此外,该 API 似乎根本没有单独的“心跳”功能。据我了解,调用poll()实际上会给我下一条消息——而我什至还没有完成当前的消息,并且还会弄乱 Kafka 消费者的迭代器 API,这在 Python 中使用起来相当方便。

如果很重要的话,如果我没记错的话,Kafka 集群是 Confluence,版本 2.3。

apache-kafka kafka-consumer-api kafka-python

2
推荐指数
1
解决办法
9135
查看次数

如何将 Kafka 消费者连接到 Django 应用程序?我应该为消费者使用新线程、新进程还是新 docker 容器?

我有 Django 应用程序,它应该使用 Kafka 消息并使用我的处理程序和现有模型处理它。我使用https://kafka-python.readthedocs.io/en/master/usage.html库。

将 KafkaConsumer 连接到 Django 应用程序的正确方法是什么?我应该使用新的守护线程吗?还是新流程?或者可能是一个单独的 docker 容器?在哪里放置代码(新的 Django 应用程序?)以及如何在 Django 应用程序准备就绪时自动启动它。以及如何更新它动态监听的主题:我应该杀死旧消费者并每次在新线程中启动新消费者吗?

django python-3.x docker microservices kafka-python

2
推荐指数
1
解决办法
2612
查看次数

如何连接 Kafka python 以接受 jaas 的用户名和密码,就像在 Java 中完成的那样?

使用现有的 Java 示例,我尝试使用 python-kafka 和 confluence_kafka 库编写一个与生成器等效的 python 版本。如何使用类似于下面 Java 中的信息在 python 中配置 sasl.jass.config?

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

...
Properties props = new Properties();
...
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"<Kafka_Username>\" password=\"<Kafka_Password>\";");

Producer<String, String> producer = new KafkaProducer<>(props);
Run Code Online (Sandbox Code Playgroud)

kafka-python confluent-platform confluent-kafka-python

2
推荐指数
1
解决办法
4483
查看次数

从kafka获取特定时间段的结果

这是我的代码,它使用kafka-python.

now = datetime.now()
month_ago = now - relativedelta(month=1)
topic = 'some_topic_name'
consumer = KafkaConsumer(topic, bootstrap_servers=PROD_KAFKA_SERVER,
                         security_protocol=PROTOCOL,
                         group_id=GROUP_ID,
                         enable_auto_commit=False,
                         sasl_mechanism=SASL_MECHANISM, sasl_plain_username=SASL_USERNAME,
                         sasl_plain_password=SASL_PASSWORD)


for msg in consumer:
    print(msg)
Run Code Online (Sandbox Code Playgroud)

我想从循环之间nowmonth_ago循环中的主题中获取结果。我怎样才能做到这一点?

谢谢你的帮助!

python consumer apache-kafka kafka-python

2
推荐指数
1
解决办法
4444
查看次数