标签: kafka-producer-api

在kafka消费者中重试逻辑

我有一个用例,我从队列中消耗某些日志并使用该日志中的一些信息命中某些第三方API,以防第三方系统没有正确响应我希望为该特定日志实现重试逻辑.

我可以添加一个时间字段并将消息重新发送到同一队列,如果其时间字段有效(即小于当前时间),则此消息将再次消耗,如果不是,则再次将其推送到队列中.

但是这个逻辑会一次又一次地添加相同的日志,直到重试时间正确并且队列将不必要地增长.

是否有更好的方法在Kafka中实现重试逻辑?

apache-kafka kafka-consumer-api kafka-producer-api

6
推荐指数
2
解决办法
7270
查看次数

KafkaProducer未成功将消息发送到队列中

我在我的Windows PC上构建了一个小测试环境,并记下以下代码来测试kafka(使用org.apache.kafka中的kafka_2.10:0.9.0.1).

package iii.functiontesting;

import java.text.ParseException;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;


/**
 * Hello world!
 *
 */
public class test4
{
    public static void main( String[] args ) throws ParseException
    {
        Properties producerProps=new Properties();
        producerProps.put("bootstrap.servers", "localhost:9092");
        producerProps.put("serializer.class",org.apache.kafka.common.serialization.StringSerializer.class.getName());
        producerProps.put("key.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());
        producerProps.put("value.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());
        producerProps.put("request.required.acks","1");
        KafkaProducer<String,String> kafkawriter= new KafkaProducer<String,String>(producerProps);
        ProducerRecord<String,String> msg=new ProducerRecord<>("TEST3","ImKey","teststring1");
        kafkawriter.send(msg);
    }
}
Run Code Online (Sandbox Code Playgroud)

我使用以下命令检查消息是否正确写入队列

D:\ Work\kafkaenv\kafka_2.10-0.9.0.1\bin\windows>.\ kafka-console-consumer.bat --zookeeper localhost:2181 --topic TEST3 - from-beginning

但是,我发现kafka-console-consumer没有显示任何内容.

我怀疑我的kafka服务器运行不正常,所以我使用console-producer进行测试.

D:\ Work\kafkaenv\kafka_2.10-0.9.0.1\bin\windows>.\ kafka-console-producer.bat --broker-list localhost:9092 --topic TEST3

AAAAA

这次我可以看到aaaaa清楚地显示在控制台消费者之下.我无法弄清楚会发生什么.谁能帮我?

java apache-kafka kafka-producer-api

6
推荐指数
1
解决办法
8076
查看次数

INFO客户端/127.0.0.1:48452的已关闭套接字连接,其中包含sessionid 0x15698f5ac360001(org.apache.zookeeper.server.NIOServerCnxn)

我安装了新鲜的zookeeper和kafka.我开始他们两个.然后,当我想使用此命令查看主题列表时:

bin/kafka-topics.sh --list --zookeeper localhost 2181
Run Code Online (Sandbox Code Playgroud)

它让我的套接字连接关闭.这是屏幕截图:

    darpanshah@darpan-ubuntu:/opt/Kafka$ bin/kafka-topics.sh --list --zookeeper localhost 2181 
2016-08-17 10:44:44,053] INFO Accepted socket connection from /127.0.0.1:48452 (org.apache.zookeeper.server.NIOServerCnxnFactory)
    [2016-08-17 10:44:44,059] INFO Client attempting to establish new session at /127.0.0.1:48452 (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-08-17 10:44:44,069] INFO Established session 0x15698f5ac360001 with negotiated timeout 30000 for client /127.0.0.1:48452 (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-08-17 10:44:44,095] INFO Processed session termination for sessionid: 0x15698f5ac360001 (org.apache.zookeeper.server.PrepRequestProcessor)
    [2016-08-17 10:44:44,105] INFO Closed socket connection for client /127.0.0.1:48452 which had sessionid 0x15698f5ac360001 (org.apache.zookeeper.server.NIOServerCnxn)
    darpanshah@darpan-ubuntu:/opt/Kafka$
Run Code Online (Sandbox Code Playgroud)

提前致谢.被困.

apache-kafka kafka-consumer-api kafka-producer-api apache-zookeeper

6
推荐指数
1
解决办法
1万
查看次数

Kafka多个分区排序

我知道无法在Kafka中订购多个分区,并且只能为组内的单个使用者(对于单个分区)保证分区排序.然而,使用Kafka Streams 0.10现在可以实现这一目标吗?如果我们使用时间戳功能,以便每个分区中的每条消息都维护订单,那么在消费者方面,让我们说Kafka Streams 0.10现在可以吗?假设我们收到所有消息,我们可能不会根据消耗的时间戳对所有分区进行排序,并可能将它们转发到单独的主题以供消费?

目前我需要维护订购,但这意味着拥有一个带有单个消费者线程的分区.我想将其更改为多个分区以增加并行性,但不知何故"让它们按顺序排列".

有什么想法吗?谢谢.

apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams

6
推荐指数
2
解决办法
5092
查看次数

对于 AvroProducer 到 Kafka,“键”和“值”的 avro 模式在哪里?

confluent-kafka-python repo 中AvroProducer示例来看,键/值模式似乎是从文件中加载的。也就是说,从这段代码:

from confluent_kafka import avro 
from confluent_kafka.avro import AvroProducer

value_schema = avro.load('ValueSchema.avsc')
key_schema = avro.load('KeySchema.avsc')
value = {"name": "Value"}
key = {"name": "Key"}

avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value=value, key=key)
Run Code Online (Sandbox Code Playgroud)

看来这些文件ValueSchema.avscKeySchema.avsc是独立于 Avro Schema Registry 加载的。

这是正确的吗?引用 Avro 架构注册表的 URL,然后从磁盘加载键/值的架构有什么意义?

请说清楚。

apache-kafka kafka-python kafka-producer-api

6
推荐指数
1
解决办法
4070
查看次数

在 kafka 中发送同步消息?

如何在kafka中发送同步消息?
实现它的一种方法是设置属性参数
max.in.flight.requests.per.connection = 1

但我想知道在 kafka 中是否有一种甚至直接或替代的方式发送同步消息。(类似于 producer.syncSend(...) 等)。

apache-kafka kafka-producer-api

6
推荐指数
2
解决办法
2万
查看次数

Kafka Producer:与相关 NETWORK_EXCEPTION 相关的错误生产响应

我们在 2 个服务器上以分布式模式运行 kafka。我正在通过 Java sdk 向 Kafka 发送消息到具有复制因子 2 和 1 分区的队列。

我们以异步模式运行。我在 Kafka 日志中没有发现任何异常。任何人都可以帮助找出可能的原因吗?

    Properties props = new Properties();
            props.put("bootstrap.servers", serverAdress);
            props.put("acks", "all");
            props.put("retries", "1");
            props.put("linger.ms",0);
            props.put("buffer.memory",10240000);
            props.put("max.request.size", 1024000);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

   Producer<String, Object> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
Run Code Online (Sandbox Code Playgroud)

异常跟踪:

-2017-08-15T02:36:29,148 [kafka-producer-network-thread | producer-1] WARN producer.internals.Sender - 在主题分区 BPA_BinLogQ-0 上得到相关 ID 为 353736 的错误生成响应,正在重试(剩余 0 次尝试)。错误:NETWORK_EXCEPTION

apache-kafka kafka-producer-api

6
推荐指数
1
解决办法
1万
查看次数

Kafka streams.allMetadata()方法返回空列表

因此,我正在尝试使用Kafka流进行交互式查询.我有Zookeeper和Kafka在本地运行(在Windows上).我使用C:\ temp作为存储文件夹,对于Zookeeper和Kafka.

我已经设置了这样的主题

kafka-topics.bat --zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic rating-submit-topic
kafka-topics.bat --zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic rating-output-topic
Run Code Online (Sandbox Code Playgroud)

阅读我已经完成了这个问题

我已阅读此文档页面:http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application

我还在这里阅读了Java示例:https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic /KafkaMusicExample.java

并且还阅读了这篇类似的帖子,它最初听起来像我一样的问题:无法从StateStore的其他应用程序访问KTable

这就是我的设置.那么问题是什么?

所以我说我正在尝试创建自己的应用程序,它允许使用自定义Akka Http REST Api(推荐的RPC调用)进行交互式查询,以允许我查询我的KTable.实际的流处理似乎正在按预期发生,我能够打印出结果,KTable并且它们与主题产生的内容相匹配.

所以存储方面的东西似乎正在发挥作用

尝试使用该Streams.allMetadata()方法时,似乎会出现问题,它返回一个空列表.

我在用

  • 项目清单
  • Scala 2.12
  • SBT
  • Akka.Http 10.9 for REST Api
  • 卡夫卡11.0

制片人代码

这是我的制作人的代码

package Processing.Ratings {

  import java.util.concurrent.TimeUnit

  import Entities.Ranking
  import Serialization.JSONSerde
  import Topics.RatingsTopics

  import scala.util.Random
  import org.apache.kafka.clients.producer.ProducerRecord
  import org.apache.kafka.clients.producer.KafkaProducer
  import org.apache.kafka.common.serialization.Serdes …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams

6
推荐指数
1
解决办法
1348
查看次数

Kafka 分区中的消息分布不均

我有一个有 10 个分区的主题,1 个消费者组有 4 个消费者,工作人员大小为 3。

我可以看到分区中的消息分布不均匀,一个分区有很多数据,另一个是免费的。

如何让我的生产者将负载平均分配到所有分区,以便所有分区都得到正确利用?

apache-kafka kafka-producer-api

6
推荐指数
1
解决办法
5320
查看次数

如果我不提任何,kafka如何决定分区

这就是我产生消息的方式:

String json = gson.toJson(msg);

ProducerRecord<String, String> record = new ProducerRecord<>(kafkaProducerConfig.getTopic(), json);
long startTime = System.currentTimeMillis();

try {
    RecordMetadata meta = producer.send(record).get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
}
Run Code Online (Sandbox Code Playgroud)

我有15这个主题的分区,我在制作时没有提到分区键,默认分配的分区是什么?

java apache-kafka kafka-producer-api

6
推荐指数
1
解决办法
2081
查看次数