我们有一个 Kafka 消费者,它将读取消息并执行这些操作,然后使用以下脚本再次发布到 Kafka 主题
生产者配置:
{
"bootstrap.servers": "localhost:9092"
}
Run Code Online (Sandbox Code Playgroud)
我还没有配置任何其他配置,如 queue.buffering.max.messages
queue.buffering.max.ms
batch.num.messages
我假设这些都将成为配置中的默认值
queue.buffering.max.messages : 100000
queue.buffering.max.ms : 0
batch.num.messages : 10000
Run Code Online (Sandbox Code Playgroud)
我的理解:当内部队列达到 queue.buffering.max.ms 或 batch.num.messages 中的任何一个时,消息将在单独的线程中发布到 Kafka。在我的配置中 queue.buffering.max.ms 是 0,所以当我调用 generate() 时每条消息都会被发布。如果我错了,请纠正我。
我的制作人片段:
def send(topic, message):
p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
p.flush()
Run Code Online (Sandbox Code Playgroud)
从这篇文章我了解到,在每条消息后使用刷新,生产者将成为同步生产者。如果我使用上面的脚本,发布到 Kafka 需要大约 45 毫秒
如果我将上面的代码段更改为
def send(topic, message):
p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
p.poll(0)
Run Code Online (Sandbox Code Playgroud)
有什么性能会提高吗?你能澄清一下我的理解吗?
谢谢
我正在使用带有红宝石发条的t2.medium机器的opswork.
每30秒我们就会触发一份工作.
SPAWNED BG JOB FOR TASK: 20 for SC: 20 with Sidekiq id: 3afda705eaf597edb4bcc257
log rotation inter-process lock failed. No such file or directory @ rb_sysopen - log/sql.log
log rotation inter-process lock failed. No such file or directory @ rb_sysopen - log/sql.log
log rotation inter-process lock failed. No such file or directory @ rb_sysopen - log/sql.log
log rotation inter-process lock failed. No such file or directory @ rb_sysopen - log/sql.log
log rotation inter-process lock failed. No such file or …
Run Code Online (Sandbox Code Playgroud) 我正在尝试将消息发布到Slack https://api.slack.com/methods/chat.postMessage/test
我正在添加附件 [{"pretext": "pre-hello", "text": "text-world"}]
但是在松弛的通道用户界面中,我看到关闭图标,如果单击关闭图标,我可以删除附件
如何禁用关闭图标,以使任何人都无法删除附件?
我正在尝试阅读来自Kafka的消息,因此我编写了简单的消费者来阅读来自Kafka的消息.
While True:
message = consumer.poll(timeout=1.0)
# i am doing something with messages
Run Code Online (Sandbox Code Playgroud)
在上面的代码输出消息类型是消息对象.我如何获得一组消息?
有可能吗?
注意:基本的消费者配置不多.