我想在使用celery的代码中引入多处理.但目前我的队列实施是在卡夫卡.
目前芹菜网站仅提到这4家经纪人:http: //docs.celeryproject.org/en/master/getting-started/brokers/index.html#broker-overview
是否可以将Celery与Kafka集成,类似于下面提到的RabbitMQ:
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
Run Code Online (Sandbox Code Playgroud) 我有一个应用程序,用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容。我创建了一个有 5 个分区的主题,并且有 5 个 kafka 消费者。但是,网页下载的超时时间为 60 秒。当其中一个 url 被下载时,服务器假定消息丢失并将数据重新发送给不同的消费者。
我已经尝试了中提到的所有内容
和
https://github.com/spring-projects/spring-kafka/issues/202
但我每次都会收到不同的错误。
是否可以将特定消费者与 kafka 中的分区联系起来?我正在为我的应用程序使用 kafka-python
我有一个使用 Kafka 1.0 作为队列的应用程序。Kafka 主题有 80 个分区和 80 个正在运行的消费者。(Kafka-python 消费者)。
通过运行命令:
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
Run Code Online (Sandbox Code Playgroud)
我看到其中一个分区卡在一个偏移处,并且随着新记录的添加,延迟不断增加。
上述命令的输出如下所示:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST
118 mytopic 37 1924 2782 858 kafka-python-1.3.4-3da99d4d-63e8-4e72-967e-xxxxxxxxxxx/localhost
119 mytopic 38 2741 2742 1 kafka-python-1.3.4-40b44482-39fc-42d0-8f55-xxxxxxxxxxx/localhost
120 mytopic 39 2713 2713 0 kafka-python-1.3.4-4121d080-1d7c-4d6b-ac58-xxxxxxxxxxx/localhost
121 mytopic 40 2687 2688 1 kafka-python-1.3.4-43441f6e-fd35-448e-b791-xxxxxxxxxxx/localhost
Run Code Online (Sandbox Code Playgroud)
这是什么原因造成的?此外,使用 reset-offsets 命令重置偏移也是不可取的,因为可能不会定期手动监控此服务器。
客户端在 Linux m/c 中作为并行进程在后台运行:
consumer = KafkaConsumer('mytopic', group_id='mygroup', bootstrap_servers='localhost:9092',
session_timeout_ms=120000, heartbeat_interval_ms=100000, max_poll_records=1,
auto_commit_interval_ms=100000, request_timeout_ms=350000, max_partition_fetch_bytes=3*1024*1024,
value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
msg …Run Code Online (Sandbox Code Playgroud)