小编Apo*_*llo的帖子

卡夫卡消费者的多个话题

我有一个主题列表(现在是10个),其大小可以在未来增加.我知道我们可以生成多个线程(每个主题)来从每个主题中使用,但在我的情况下,如果主题数量增加,那么从主题中消耗的线程数量会增加,这是我不想要的,因为主题不是过于频繁地获取数据,因此线程将是理想的.

有没有办法让一个消费者从所有主题中消费?如果是,那我们怎样才能实现呢?卡夫卡还将如何维持这种抵消?请提出答案.

java multithreading apache-kafka kafka-consumer-api

18
推荐指数
2
解决办法
2万
查看次数

为kafka主题配置ACL

我有一个不安全的kafka实例,有2个代理,一切运行正常,直到我决定为主题配置ACL,在ACL配置后,我的消费者停止从Kafka轮询数据,并且在获取具有相关ID的元数据时我不断收到警告错误,我的代理属性看起来像下面:-

listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
Run Code Online (Sandbox Code Playgroud)

我的客户端配置如下所示: -

bootstrap.servers=localhost:9092
topic.name=topic-name
group.id=topic-group
Run Code Online (Sandbox Code Playgroud)

我用下面的命令来配置ACL

bin\windows\kafka-acls.bat  --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* Read --allow-host localhost  --consumer --topic topic-name --group topic-group
Run Code Online (Sandbox Code Playgroud)

在我启动消费者后完成所有上述配置后,它停止接收消息.有人可以指出我在误解的地方.提前致谢.

java apache-kafka kafka-consumer-api

7
推荐指数
1
解决办法
1311
查看次数

Kafka Java消费者被标记为群体死亡

我正在使用Java使用者来使用来自主题(kafka版本0.10.0.1)的消息,如果我在docker容器之外运行它们,它可以正常工作.但是,当我在docker容器中执行它们时,那​​些组被标记为dead with message

Marking the coordinator local.kafka.com:9092 (id: 2147483647 rack: null) dead for group my-group
Run Code Online (Sandbox Code Playgroud)

我的消费者配置如下: -

metadata.max.age.ms = 300000
partition.assignment.strategy =[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [192.168.115.128:9092, 192.168.115.128:9093]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = consumer-1
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api

7
推荐指数
1
解决办法
4375
查看次数

春天的单身豆

我知道这个问题可能听起来很幼稚,但我对Web应用程序中bean的范围感到困惑.我知道,对于每个请求,容器会产生一个新线程,类似于Spring Web应用程序,每个请求都会生成一个新线程,那么为什么建议我将控制器定义为单例服务,不应该是范围这些bean是原型,因为每个请求即线程都有自己的控制器实例,服务可以使用.

请赐教.

java spring multithreading

5
推荐指数
2
解决办法
310
查看次数

Ansible docker_container etc_hosts with variable key

我有一个ansible脚本,我通过它生成一个docker容器并向它添加几个主机条目,因为etc_hosts将key作为主机名和相应的IP地址.在我的情况下,我需要让主机名和IP地址由某个变量驱动,例如

docker_container:
name: image-name
image: image-to-be-pulled
state: started
restart_policy: always
etc_hosts:
  "{{ domain_1 }}": "{{ domain_1_ip }}"
  domain_2 : "{{ domain_2_ip }}"
  domain_3 : "{{ domain_3_ip }}"  
Run Code Online (Sandbox Code Playgroud)

当我进行上述配置时,它会将主机文件的条目作为

xx.xx.xx.xxx {{ domain_1 }}
Run Code Online (Sandbox Code Playgroud)

理想情况下,主机文件应该包含针对IP的主机名,有人可以建议我如何实现这一点.提前致谢

ansible docker ansible-playbook ansible-2.x

3
推荐指数
1
解决办法
3335
查看次数