标签: kafka-producer-api

Kafka Serializer JSON

我是Kafka,Serialization和JSON的新手

我想要的是生产者通过kafka和消费者发送JSON文件以使用原始文件形式的JSON文件.

我能够得到它所以JSON转换为字符串并通过String Serializer发送然后消费者将解析String并重新创建一个JSON对象但我担心这不是有效的或正确的方法(可能会失去字段类型对于JSON)

所以我研究了制作JSON序列化程序并在我的制作人的配置中设置它.

我在这里使用了JsonEncoder:Kafka:编写自定义序列化器

但是当我现在尝试运行我的生产者时,似乎在编码器的toBytes函数中,try块永远不会返回任何像我想要的那样

try {
            bytes = objectMapper.writeValueAsString(object).getBytes();

        } catch (JsonProcessingException e) {
            logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
        }
Run Code Online (Sandbox Code Playgroud)

似乎objectMapper.writeValueAsString(object).getBytes(); 接受我的JSON obj({"name":"Kate","age":25})并将其转换为空,

这是我的制作人的跑步功能

List<KeyedMessage<String,JSONObject>> msgList=new ArrayList<KeyedMessage<String,JSONObject>>();   

    JSONObject record = new JSONObject();

    record.put("name", "Kate");
    record.put("age", 25);

    msgList.add(new KeyedMessage<String, JSONObject>(topic, record));

    producer.send(msgList);
Run Code Online (Sandbox Code Playgroud)

我错过了什么?我的原始方法(转换为字符串并发送然后重建JSON obj)是否可以?或者只是没有正确的方法去?

谢谢!

serialization json apache-kafka kafka-producer-api

7
推荐指数
2
解决办法
2万
查看次数

如果消息是由制作人制作的,如何从Kafka经纪人那里得到确认?

当我发出消息时,我想从经纪人那里得到一些回应.我已经尝试过使用的CallBack机制(通过实现CallBack),KafkaProducer.send但它没有工作,也没有调用onCompletion方法.

当我关闭Kafka服务器并尝试生成消息时,它会调用回调方法.

有没有其他方式得到确认?

@Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        System.out.println("Called Callback method");
        if (metadata != null) {
            System.out.println("message(" + key + ", " + message
                    + ") sent to partition(" + metadata.partition() + "), "
                    + "offset(" + metadata.offset() + ") in " + elapsedTime
                    + " ms");
        } else {
            exception.printStackTrace();
        }

    }

props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "mytopic");
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);

KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props); …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api kafka-producer-api

7
推荐指数
2
解决办法
2万
查看次数

保证向Kafka集群传递多条消息

如果我连续几次向Kafka集群发布消息(使用新的Producer API),我会Future从生产者那里获得每条消息的消息.

现在,假设我已经配置我的制片人有max.in.flight.requests.per.connection = 1retries > 0我只能等待最后的未来,可以肯定,所有先前也已交付(和顺序)?或者我需要等待所有期货?在代码中,我可以这样做:

Producer<String, String> producer = new KafkaProducer<>(myConfig);
Future<?> f = null;
for(MessageType message : messages){
  f = producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue());
}
try {
  f.get();
} catch(ExecutionException e) {
  //handle exception
}
Run Code Online (Sandbox Code Playgroud)

而不是这个:

Producer<String, String> producer = new KafkaProducer<>(myConfig);
List<Future<?>> futureList = new ArrayList<>();
for(MessageType message : messages){
  futureList.add(producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue()));
}
try {
  for(Future<?> f : futureList) {
    f.get();
  }
} catch(ExecutionException …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-producer-api

7
推荐指数
1
解决办法
789
查看次数

是否可以使用Kafka传输文件?

我每天都会生成数千个文件,我希望使用Kafka进行流式传输.当我尝试读取文件时,每一行都被视为单独的消息.

我想知道如何在Kafka主题中将每个文件的内容作为单个消息进行处理,并且消费者如何在单独的文件中将Kafka主题中的每个消息写入.

apache-kafka kafka-consumer-api kafka-producer-api

7
推荐指数
1
解决办法
6866
查看次数

Kafka最佳保留和删除政策

我对卡夫卡很新,所以请原谅我这个问题是微不足道的.我有一个非常简单的设置,用于时序测试,如下所示:

机器A - >写入主题1(代理) - >机器B从主题1读取机器B - >将消息写入主题2(代理) - >机器A从主题2读取

现在我在无限循环中发送大约1400字节的消息,很快就填满了我的小经纪人的空间.我正在尝试为log.retention.ms,log.retention.bytes,log.segment.bytes和log.segment.delete.delay.ms设置不同的值.首先,我将所有值设置为允许的最小值,但似乎这降低了性能,然后我将它们设置为我的代理在完全填满之前可以采取的最大值,但是当删除发生时性能再次下降.是否有最佳实践来设置这些值以获得绝对最小延迟?

谢谢您的帮助!

apache-kafka kafka-consumer-api kafka-python kafka-producer-api

7
推荐指数
1
解决办法
1万
查看次数

重现UnknownTopicOrPartitionException:此服务器不承载此主题分区

我们在生产环境中遇到了一些例外:

UnknownTopicOrPartitionException: This server does not host this topic-partition
Run Code Online (Sandbox Code Playgroud)

根据我的分析,这个问题的一个可能的解决方法是增加no,retries因为这是一个可重复的例外.

我正面临一些在本地复制这个问题的困难.我试图在生产过程中打倒经纪人但却失败了TimeoutException.

我正在寻找重现此问题的建议.

apache-kafka kafka-producer-api

7
推荐指数
1
解决办法
6709
查看次数

Web前端直接向Kafka经纪人生产是可行的想法吗?

我刚刚开始学习卡夫卡。因此,尝试构建一个社交媒体Web应用程序。我很清楚如何在后端使用Kafka(从后端到数据库和其他服务的通信)。

但是,我不确定前端应如何与后端通信。我当时考虑的架构为:Frontend-> Kafka-> Backend


前端充当生产者,后端充当消费者。在这种情况下,前端应该具有将所有必需的资源发布到Kafka经纪人(即使我在Kafka上实现了安全性)。现在,这种情况是否可行:

可以说我冒充了前端,并向我的Kafka经纪人发送了荒唐/无效的消息。现在,当它们到达我的后端时,我可以处理和过滤这些消息。但我知道Kafka会暂时存储这些消息。如果将这种“假”消息大量发布到我的Kafka服务器上,我的Kafka服务器不会面对DDOS问题,因为无论如何它们都会被存储,因为它们不会被过滤掉,直到它们被后端消耗掉为止?

如果是这样,我该如何预防呢?

还是这不是一个好选择?我还可以尝试使用REST进行前端/后端通信,然后从后端使用Kafka与数据库和其他内容进行通信。

或者,我可以有一个中间件(还是REST)来检测并过滤掉此类消息。

apache-kafka kafka-consumer-api kafka-producer-api

7
推荐指数
1
解决办法
2504
查看次数

Kafka Producer将消息发布到单个分区

我是 Kafka 的新手,正在阅读可用的官方文档。

在我的本地系统上,我已经启动了一个卡夫卡实例和动物园管理员。Zookeper 和 kafka 服务器都在默认端口上运行。

我创建了一个主题“test”,复制因子为 1,因为我只有一个 kafka 实例启动并运行。

除此之外,我还创建了两个分区。

我有两个消费者在同一消费者组中订阅了该队列。

现在我已经在 Windows 机器上使用命令提示符启动消费者。

当我从命令提示符启动生产者并将消息发布到主题时,一切正常。Kafka 使用循环法将消息推送到两个分区,并且每个消费者交替接收消息,因为每个消费者都在监听不同的分区。

但是,当我使用 java kafka-client jar 创建生产者时,即使我对消息使用不同的密钥,生产者也会将所有消息推送到同一个分区,因为所有消息都在同一个消费者上接收。

分区不是静态的,而且每次我运行生产者时它都会不断变化。

我尝试了与从命令提示符启动的生产者相同的场景,其配置与我使用 java 代码向 kafka-client 生产者提供的配置完全相同。命令提示符生成器似乎工作正常,但代码生成器将所有消息推送到同一分区。

我尝试更改某些消息的密钥,希望代理将其发送到不同的分区,因为文档中提到代理使用消息的密钥路由消息。

public class KafkaProducerParallel {


public static void main(String[] args) throws InterruptedException, 
ExecutionException {

    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
    properties.put(ProducerConfig.CLIENT_ID_CONFIG, "parallelism- 
 producer");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
 StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
LongSerializer.class);


    Producer<String, Long> parallelProducer = new KafkaProducer<> 
(properties);

    for(long i=0;i<100;i++) {

        ProducerRecord<String, Long> producerRecord;

        if(i<50) {
            producerRecord = new ProducerRecord<String, 
 Long>("second-topic", "Amoeba", i);
        }else …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-producer-api

7
推荐指数
2
解决办法
1万
查看次数

如何设置kafka事务生产者

我正在尝试在 Windows 10 上设置 kafka 事务生产者。我从 CLI 下载并作为单节点运行 kafka。运行如下:

  • .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
  • .\bin\windows\kafka-server-start.bat .\config\server.properties

(仅更改了 server.properties -> log.dirs )

一切都很好,卡夫卡已经启动并运行。默认测试:kafka-console- Producer 、 kafka-console-consumer 效果良好。

public class SampleTest {

    private final static Logger logger = LoggerFactory.getLogger(SampleTest.class);

    public static void main(String[] args) {

        Properties producerConfig = new Properties();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer");
        producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // enable idempotence
        producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-trx-id123"); // set transaction id
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        Producer<String, String> producer = new KafkaProducer<>(producerConfig);

        producer.initTransactions();
        try {
            producer.beginTransaction();
            String firstMsg = "Hello 1";
            producer.send(new …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-producer-api spring-kafka

6
推荐指数
0
解决办法
6512
查看次数

Kafka commit事务确认失败

根据Kafka的commitTransaction文档,如果在一定时间内没有收到响应,commitTransaction将会失败TimeoutException

请注意,如果在 max.block.ms 到期之前无法提交事务,则此方法将引发 TimeoutException。此外,如果中断,它将引发 InterruptException。在任何一种情况下重试都是安全的,但不可能尝试不同的操作(例如 abortTransaction),因为提交可能已经在完成过程中。如果不重试,唯一的选择是关闭生产者。

考虑一个应用程序,其中 Kafka 生产者发送一组记录作为事务 A。

记录成功发送到主题后,Kafka Producer 将执行commitTransaction. Kafka 集群接收提交事务请求并成功提交作为事务 A 一部分的记录。Kafka 集群发送有关成功提交的确认。

然而,由于某些问题,此确认丢失,导致TimeoutKafka 生产者commitTransaction调用出现异常。因此,即使记录已经提交到 Kafka 集群上,从生产者的角度来看,提交还是失败了。

通常在这种情况下,应用程序会在新事务 B 中重试发送事务 A 记录,但这会导致记录重复,因为它们已经作为事务 A 的一部分提交。

上述场景可能吗?您如何处理commitTransaction确认丢失以及由此导致的最终记录重复?

distributed-system duplicates packet-loss apache-kafka kafka-producer-api

6
推荐指数
0
解决办法
513
查看次数