标签: kafka-producer-api

Kafka生产者读取数据文件

我试图在循环中加载数据文件(以检查统计数据)而不是 Kafka 中的标准输入。下载Kafka后,我执行了以下步骤:

启动动物园管理员:

bin/zookeeper-server-start.sh config/zookeeper.properties
Run Code Online (Sandbox Code Playgroud)

启动服务器:

bin/kafka-server-start.sh config/server.properties
Run Code Online (Sandbox Code Playgroud)

创建了一个名为“test”的主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Run Code Online (Sandbox Code Playgroud)

跑生产者:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
Test1
Test2
Run Code Online (Sandbox Code Playgroud)

消费者聆听:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test1
Test2
Run Code Online (Sandbox Code Playgroud)

我想将数据文件传递给生产者,而不是标准输入,消费者可以直接看到它。或者是否有任何 kafka 生产者而不是控制台消费者使用它我可以读取数据文件。任何帮助将不胜感激。谢谢!

apache-kafka kafka-producer-api

5
推荐指数
1
解决办法
2万
查看次数

kafka ack如何批量生成AsyncProducer

使用批处理异步生产者时,kafka如何发送确认?ack是每个消息/每批次/每个子批次(即每个分区的批次)?是否建议在异步批处理prdocuer中使用ack?或者更好的只是使用回调机制?

apache-kafka kafka-producer-api

5
推荐指数
1
解决办法
351
查看次数

kafka java生产者卡在生产消息中

我正在使用 java api 实现 apache kafka 生产者。Apache Kafka 安装在本地主机上。Zookeeper 也正在运行,但仍然生产者.send() 函数卡在发送消息中并且消息未发布。

我已经创建了“快速消息”主题。

下面是代码。

package com.hsahu.kafka.producer;

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerExample {
public static void main(String[] args) {

    Properties props = new Properties();

    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 0);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<String, String> producer = new KafkaProducer<>(props);

    try {
        producer.send(new ProducerRecord<String, String>("fast-messages", "This is a dummy message"));
    } catch(Exception ex) {
        System.out.println(ex);
    }

    System.out.println("message publisher");

    producer.close();
}
Run Code Online (Sandbox Code Playgroud)

}

我该怎么办 …

java apache-kafka kafka-producer-api apache-zookeeper

5
推荐指数
1
解决办法
2687
查看次数

使用kafka的双向消息传递系统

有可能使用apache kafka开发双向消息传递系统吗?我需要从消费者那里订阅主题,也需要从消费者那里发送消息。

java apache-kafka kafka-consumer-api kafka-producer-api

5
推荐指数
1
解决办法
1536
查看次数

Kafka 命令行生产者/消费者有 1 秒的延迟

我正在使用命令行生产者和消费者对 Kafka 进行测试运行。

我在一个终端窗口中运行它

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic tag7
Run Code Online (Sandbox Code Playgroud)

这在另一个

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tag7 --zookeeper localhost:2181
Run Code Online (Sandbox Code Playgroud)

但是我发送给消费者的数据需要 1 秒多的时间。我发送的数据是我输入到生产者的数据,所以基本上每隔几秒就会有 1 条消息。是否有任何我可以更改的配置选项,以便 Kafka 代理每秒期望很少的消息,从而使消息移动得更快?

我使用的是Zookeeper和Kafka的默认配置,所以配置不多。

先感谢您!

apache-kafka kafka-consumer-api kafka-producer-api

5
推荐指数
2
解决办法
2927
查看次数

KafkaTemplate 线程安全吗

Spring Boot 线程中的 KafkaTemplate 是否安全。我可以创建一个 KafkaTemplate 并使用它来将信息发送到同一 kafka 主题,以便在我的 Web 服务中进行多个请求。

apache-kafka spring-boot kafka-producer-api

5
推荐指数
1
解决办法
1791
查看次数

Wurstmeister kafka 中的 LEADER_NOT_AVAILABLE 错误

我正在尝试使用以下链接设置 kafka 集群。

https://github.com/wurstmeister/kafka-docker

当我尝试生成一些 msg 时,我收到以下错误消息。

[2017-09-20 17:00:53,160] WARN Error while fetching metadata with correlation id 3 : {topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
Run Code Online (Sandbox Code Playgroud)

现在,我尝试编辑 server.properties 并设置值

Adverted.listeners=PLAINTEXT://192.168.1.162:9092

设置我的主机名和端口号。

当我尝试停止并启动 kafka 服务器时,出现以下错误 -

bash-4.3# ./kafka-server-stop.sh 
No kafka server to stop
Run Code Online (Sandbox Code Playgroud)

我成功地能够创建一个主题

bash-4.3# $KAFKA_HOME/bin/kafka-topics.sh --create --topic topic1 \
> --partitions 4 --zookeeper $ZK --replication-factor 2
Created topic "topic1".
Run Code Online (Sandbox Code Playgroud)

如何重新启动 kafka 以实现 server.properties 更改。

apache-kafka kafka-consumer-api kafka-producer-api

5
推荐指数
1
解决办法
5403
查看次数

如何检测kafka主题中的重复消息?

嗨,我有一个类似于下图所示的架构。

我有两个 kafka 生产者,它们将向 kafka 主题发送频繁重复消息的消息。

有没有一种方法可以以简单的方式处理这种情况,例如服务总线主题。

感谢您的帮助。

在此处输入图片说明

azureservicebus apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams

5
推荐指数
1
解决办法
9612
查看次数

Kafka:在 Windows 环境中运行 Confluent

我为本地运行设置了 Kafka。我已经用 Java 编写了示例生产者和消费者,并通过启动服务器和动物园管理员从本地运行。
我想使用oracle作为生产者,这需要编写配置文件(已经编写),confluent shell script才能在Unix上运行它。

有什么办法可以confluent在 Windows上运行,我confluent在安装程序中找不到批处理文件?

另外,有没有办法在不使用confluent脚本的情况下以生产者身份运行 Oracle ?

java apache-kafka kafka-producer-api apache-kafka-connect confluent-platform

5
推荐指数
1
解决办法
1万
查看次数

来自远程服务器的 Kafka 生产者

我正在使用 Apache Kafka 版本 (2.1.0) 开发流 API。我有一个 Kafka 集群和一个外部服务器。外部服务器将生成供 Kafka 集群使用的数据。

我们将外部服务器表示为E,将集群表示为CE没有安装Kafka。我在它上面运行一个 JAR 文件来生成消息。以下是 Producer 属性的片段

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrapIp:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer");
Run Code Online (Sandbox Code Playgroud)

我设置bootstrapIp为 Kafka 代理 IP。

在集群端,我使用以下命令启动消费者控制台:

kafka-console-consumer --bootstrap-server bootstrapIp:9092 --topic T1 --from-beginning
Run Code Online (Sandbox Code Playgroud)

我设置bootstrapIp为集群引导服务器IP。

当在集群上运行生产者和消费者时,它工作得很好,但是当我在外部服务器(E )中运行生产者和在集群( C )中运行消费者时,数据没有被消耗。

在本地主机中,当我在集群( C )中运行生产者和消费者时,一切都工作正常,一切都工作正常,当在外部运行生产者时,我无法使用集群中的数据。

从集群 ( C ) 到外部服务器 ( E ) 的ping工作正常,但我看不出问题到底出在哪里。

我无法弄清楚如何使用来自外部服务器的消息。

编辑

从外部服务器(E)我telnet(bootstrapIp):telnet bootstrapIp 9092并且它有效,我不明白这个问题

apache-kafka kafka-consumer-api kafka-producer-api

5
推荐指数
1
解决办法
2690
查看次数