标签: kafka-producer-api

Kafka:如何连接kafka-console-consumer来获取远程代理主题内容?

我在ec2上的一台机器上设置了一个kafka zookeeper和3个代理,端口为9092..9094,我正在尝试使用另一台机器上的主题内容.端口2181(zk),9092,9093和9094(服务器)对消费者机器开放.我甚至可以做一个bin/kafka-topics.sh --describe --zookeeper 172.X.X.X:2181 --topic remotetopic给我的东西

主题:remotetopic PartitionCount:1 ReplicationFactor:3配置:主题:remotetopic分区:0领导者:2个副本:2,0,1 Isr:2,0,1 Blockquote

但是当bin/kafka-console-consumer.sh --zookeeper 172.X.X.X:2181 --from-beginning --topic remotetopic我得到的时候

警告从代理[id:0,host:localhost,port:9092] 获取主题[Set(remotetopic)]的相关ID为0的主题元数据失败(kafka.client.ClientUtils $)java.nio.channels.ClosedChannelException

为什么消费者试图从localhost读取?是否有任何选项或命令行或默认文件从中读取; 我可以改变它吗?

任何帮助,将不胜感激!

amazon-ec2 apache-kafka kafka-consumer-api kafka-producer-api apache-zookeeper

6
推荐指数
3
解决办法
1万
查看次数

在kafka消费者中重试逻辑

我有一个用例,我从队列中消耗某些日志并使用该日志中的一些信息命中某些第三方API,以防第三方系统没有正确响应我希望为该特定日志实现重试逻辑.

我可以添加一个时间字段并将消息重新发送到同一队列,如果其时间字段有效(即小于当前时间),则此消息将再次消耗,如果不是,则再次将其推送到队列中.

但是这个逻辑会一次又一次地添加相同的日志,直到重试时间正确并且队列将不必要地增长.

是否有更好的方法在Kafka中实现重试逻辑?

apache-kafka kafka-consumer-api kafka-producer-api

6
推荐指数
2
解决办法
7270
查看次数

Kafka多个分区排序

我知道无法在Kafka中订购多个分区,并且只能为组内的单个使用者(对于单个分区)保证分区排序.然而,使用Kafka Streams 0.10现在可以实现这一目标吗?如果我们使用时间戳功能,以便每个分区中的每条消息都维护订单,那么在消费者方面,让我们说Kafka Streams 0.10现在可以吗?假设我们收到所有消息,我们可能不会根据消耗的时间戳对所有分区进行排序,并可能将它们转发到单独的主题以供消费?

目前我需要维护订购,但这意味着拥有一个带有单个消费者线程的分区.我想将其更改为多个分区以增加并行性,但不知何故"让它们按顺序排列".

有什么想法吗?谢谢.

apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams

6
推荐指数
2
解决办法
5092
查看次数

Kafka streams.allMetadata()方法返回空列表

因此,我正在尝试使用Kafka流进行交互式查询.我有Zookeeper和Kafka在本地运行(在Windows上).我使用C:\ temp作为存储文件夹,对于Zookeeper和Kafka.

我已经设置了这样的主题

kafka-topics.bat --zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic rating-submit-topic
kafka-topics.bat --zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic rating-output-topic
Run Code Online (Sandbox Code Playgroud)

阅读我已经完成了这个问题

我已阅读此文档页面:http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application

我还在这里阅读了Java示例:https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic /KafkaMusicExample.java

并且还阅读了这篇类似的帖子,它最初听起来像我一样的问题:无法从StateStore的其他应用程序访问KTable

这就是我的设置.那么问题是什么?

所以我说我正在尝试创建自己的应用程序,它允许使用自定义Akka Http REST Api(推荐的RPC调用)进行交互式查询,以允许我查询我的KTable.实际的流处理似乎正在按预期发生,我能够打印出结果,KTable并且它们与主题产生的内容相匹配.

所以存储方面的东西似乎正在发挥作用

尝试使用该Streams.allMetadata()方法时,似乎会出现问题,它返回一个空列表.

我在用

  • 项目清单
  • Scala 2.12
  • SBT
  • Akka.Http 10.9 for REST Api
  • 卡夫卡11.0

制片人代码

这是我的制作人的代码

package Processing.Ratings {

  import java.util.concurrent.TimeUnit

  import Entities.Ranking
  import Serialization.JSONSerde
  import Topics.RatingsTopics

  import scala.util.Random
  import org.apache.kafka.clients.producer.ProducerRecord
  import org.apache.kafka.clients.producer.KafkaProducer
  import org.apache.kafka.common.serialization.Serdes …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams

6
推荐指数
1
解决办法
1348
查看次数

卡夫卡生产者超时异常

我正在运行将数据写入 Kafka 主题的 Samza 流作业。Kafka 正在运行一个 3 节点集群。Samza 作业部署在纱线上。我们在容器日志中看到了很多这些异常:

 INFO [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.container.ContainerHeartbeatMonitor:[ContainerHeartbeatMonitor:stop:61] - [main] - Stopping ContainerHeartbeatMonitor
ERROR [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.runtime.LocalContainerRunner:[LocalContainerRunner:run:107] - [main] - Container stopped with Exception. Exiting process now.
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
        at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:147)
        at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:694)
        at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:104)
        at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:149)
Caused by: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
        at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:181)
        at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
        at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:160)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
        at …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-samza kafka-producer-api

5
推荐指数
1
解决办法
2万
查看次数

本地主机无法使用 Kafka 代理

我已经安装kafka_2.11-1.1.0并将广告监听器设置为advertised.listeners=PLAINTEXT://<my-ip>:9092(in $KAFKA_HOME/config/server.properties)。

我可以使用 java 代码连接并写入我的 kafka,并通过kafka-tool另一台服务器查看我的集群,但我无法从本地机器(我在其上安装了 kafka 集群的机器)向我的主题写入消息。

我也尝试将 listeners 值设置为,listeners = PLAINTEXT://:9092但没有任何变化。我应该如何处理我的 kafka 以使其从本地主机的外部和内部都可以访问和写入?

apache-kafka kafka-consumer-api kafka-producer-api

5
推荐指数
2
解决办法
6930
查看次数

Kafka Streams 在生成主题时不会将偏移量增加 1

我已经实现了一个简单的 Kafka 死信记录处理器。

当使用控制台生产者产生的记录时,它工作得很好。

但是,我发现我们的 Kafka Streams 应用程序并不能保证为接收器主题生成记录,对于生成的每个记录,偏移量将增加 1。

死信处理器背景:

我有一个场景,在发布处理记录所需的所有数据之前,可能会收到记录。当流应用程序处理的记录不匹配时,它们将移动到死信主题,而不是继续向下流。当新数据发布时,我们将来自死信主题的最新消息转储回流应用程序的源主题,以便使用新数据进行重新处理。

死信处理器:

  • 在运行应用程序开始时记录每个分区的结束偏移量
  • 如果重新处理的记录返回到死信主题,结束偏移量标记停止处理给定死信主题的记录的点,以避免无限循环。
  • 应用程序通过消费者组从上次运行产生的最后一个偏移量恢复。
  • 应用程序正在使用事务并KafkaProducer#sendOffsetsToTransaction提交最后产生的偏移量。

为了跟踪我的范围内的所有记录何时针对某个主题的分区被处理,我的服务将其从生产者的最后产生的偏移量与消费者保存的结束偏移量映射进行比较。当我们到达结束偏移量时,消费者通过以下方式暂停该分区KafkaConsumer#pause,当所有分区都暂停时(意味着它们到达保存的结束偏移量),然后调用它退出。

卡夫卡消费者API国:

偏移量和消费者位置 Kafka 为分区中的每条记录维护一个数字偏移量。该偏移量充当该分区内记录的唯一标识符,并且还表示消费者在该分区中的位置。例如,位于位置 5 的消费者已经消费了偏移量为 0 到 4 的记录,接下来将接收偏移量为 5 的记录。

卡夫卡生产者API引用下一偏移量始终是+1为好。

将指定偏移量列表发送给消费者组协调器,并将这些偏移量标记为当前事务的一部分。仅当事务成功提交时,这些偏移量才会被视为已提交。提交的偏移量应该是您的应用程序将使用的下一条消息,即 lastProcessedMessageOffset + 1。

但是您可以在我的调试器中清楚地看到,单个分区消耗的记录一次只增加 1 次... 在此处输入图片说明

我想这可能是 Kafka 配置问题,max.message.bytes但没有一个真正有意义。然后我想也许是因为加入,但没有看到任何会改变制片人运作方式的方式。

不确定它是否相关,但我们所有的 Kafka 应用程序都在使用 Avro 和 Schema Registry...

无论生产方法如何,偏移量是否应该始终增加 1,或者使用 Kafka 流 API 是否可能无法提供与普通生产者消费者客户端相同的保证?

有什么完全是我遗漏的吗?

java apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams

5
推荐指数
1
解决办法
4005
查看次数

在 JAAS 或 Kafka 配置(不是 Kerberos)中没有定义 serviceName

我正在尝试配置 kafka 客户端以针对安全的 kafka 服务器进行身份验证。我已经设置了 jaas 和 ssl 配置,但它在抱怨 serviceNames。

我没有使用 Kerberos。

命令

KAFKA_OPTS="-Djava.security.auth.login.config=./jaas.conf" \ 
kafka-console-producer --broker-list k0:9092,k1:9092,k2:9092 \
   --topic test-topic 
   --producer.config ./ssl.properties
Run Code Online (Sandbox Code Playgroud)

错误

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>
    [ ... ] 
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
Run Code Online (Sandbox Code Playgroud)

配置文件

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    serviceName="kafka"
    password="broker-secret"
    user_broker="broker-secret"
    sasl.enabled.mechanisms=PLAIN
    sasl.mechanism.inter.broker.protocol=PLAIN
    confluent.metrics.reporter.sasl.mechanism=PLAIN
    user_username1="password1";
};
Run Code Online (Sandbox Code Playgroud)

ssl.properties

bootstrap.servers=k0:9092,k1:9092,k2:9092
security.protocol=SASL_PLAINTEXT
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=confluent
ssl.keystore.location=/var/ssl/private/client.keystore.jks
ssl.keystore.password=confluent
ssl.key.password=confluent


producer.bootstrap.servers=k0:9092,1:9092,k2:9092
producer.security.protocol=SASL_PLAINTEXT
producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
producer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
producer.ssl.truststore.password=confluent
producer.ssl.keystore.location=/var/ssl/private/client.keystore.jks
producer.ssl.keystore.password=confluent
producer.ssl.key.password=confluent

org.apache.kafka.common.security.plain.PlainLoginModule required
password="broker-secret"
user_broker="broker-secret" …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api apache-kafka-security

5
推荐指数
1
解决办法
1万
查看次数

即使生产者得到确认,Kafka 中也会发生消息丢失吗?

卡夫卡医生说:

  • Kafka 严重依赖文件系统来存储和缓存消息。
  • 现代操作系统提供预读和后写技术,这些技术以大块倍数预取数据,并将较小的逻辑写入分组为较大的物理写入。
  • 现代操作系统在使用主内存进行磁盘缓存方面变得越来越积极。当内存被回收时,现代操作系统很乐意将所有空闲内存转移到磁盘缓存中,而性能损失很小。所有磁盘读写都会经过这个统一缓存
  • ...与其在内存中尽可能多地维护并在空间不足时恐慌地将其全部刷新到文件系统中,不如将其反转。所有数据都会立即写入文件系统上的持久日志,而不必刷新到磁盘。实际上,这只是意味着它被传输到内核的页面缓存中。”

进一步这篇文章说:

(3) 当所有同步副本都将消息应用到他们的日志时,一条消息被“提交”,并且 (4) 任何提交的消息都不会丢失,只要至少一个同步副本处于活动状态。

因此,即使我将生产者配置为acks=all(这会导致生产者在所有代理提交消息后收到确认)并且生产者收到某些消息的确认,这是否意味着他们仍然有可能丢失消息,特别是如果所有代理都出现故障操作系统从不将提交的消息缓存刷新到磁盘?

apache-kafka kafka-producer-api

5
推荐指数
1
解决办法
776
查看次数

有没有办法在 kafka-console-producer.sh 中添加标题

我想使用 kafka-console-producer.sh 来触发一些带有 Kafka 标头的 JSON 消息。

这可能吗?

docker exec -it kafka_1 /opt/kafka_2.12-2.3.0/bin/kafka-console-producer.sh --broker-list localhost:9093 --topic my-topic --producer.config /opt/kafka_2.12-2.3.0/config/my-custom.properties
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api

5
推荐指数
1
解决办法
4150
查看次数