我试图在循环中加载数据文件(以检查统计数据)而不是 Kafka 中的标准输入。下载Kafka后,我执行了以下步骤:
启动动物园管理员:
bin/zookeeper-server-start.sh config/zookeeper.properties
Run Code Online (Sandbox Code Playgroud)
启动服务器:
bin/kafka-server-start.sh config/server.properties
Run Code Online (Sandbox Code Playgroud)
创建了一个名为“test”的主题:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Run Code Online (Sandbox Code Playgroud)
跑生产者:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Test1
Test2
Run Code Online (Sandbox Code Playgroud)
消费者聆听:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test1
Test2
Run Code Online (Sandbox Code Playgroud)
我想将数据文件传递给生产者,而不是标准输入,消费者可以直接看到它。或者是否有任何 kafka 生产者而不是控制台消费者使用它我可以读取数据文件。任何帮助将不胜感激。谢谢!
使用批处理异步生产者时,kafka如何发送确认?ack是每个消息/每批次/每个子批次(即每个分区的批次)?是否建议在异步批处理prdocuer中使用ack?或者更好的只是使用回调机制?
我正在使用 java api 实现 apache kafka 生产者。Apache Kafka 安装在本地主机上。Zookeeper 也正在运行,但仍然生产者.send() 函数卡在发送消息中并且消息未发布。
我已经创建了“快速消息”主题。
下面是代码。
package com.hsahu.kafka.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 0);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
producer.send(new ProducerRecord<String, String>("fast-messages", "This is a dummy message"));
} catch(Exception ex) {
System.out.println(ex);
}
System.out.println("message publisher");
producer.close();
}
Run Code Online (Sandbox Code Playgroud)
}
我该怎么办 …
有可能使用apache kafka开发双向消息传递系统吗?我需要从消费者那里订阅主题,也需要从消费者那里发送消息。
我正在使用命令行生产者和消费者对 Kafka 进行测试运行。
我在一个终端窗口中运行它
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic tag7
Run Code Online (Sandbox Code Playgroud)
这在另一个
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tag7 --zookeeper localhost:2181
Run Code Online (Sandbox Code Playgroud)
但是我发送给消费者的数据需要 1 秒多的时间。我发送的数据是我输入到生产者的数据,所以基本上每隔几秒就会有 1 条消息。是否有任何我可以更改的配置选项,以便 Kafka 代理每秒期望很少的消息,从而使消息移动得更快?
我使用的是Zookeeper和Kafka的默认配置,所以配置不多。
先感谢您!
Spring Boot 线程中的 KafkaTemplate 是否安全。我可以创建一个 KafkaTemplate 并使用它来将信息发送到同一 kafka 主题,以便在我的 Web 服务中进行多个请求。
我正在尝试使用以下链接设置 kafka 集群。
https://github.com/wurstmeister/kafka-docker
当我尝试生成一些 msg 时,我收到以下错误消息。
[2017-09-20 17:00:53,160] WARN Error while fetching metadata with correlation id 3 : {topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
Run Code Online (Sandbox Code Playgroud)
现在,我尝试编辑 server.properties 并设置值
Adverted.listeners=PLAINTEXT://192.168.1.162:9092
设置我的主机名和端口号。
当我尝试停止并启动 kafka 服务器时,出现以下错误 -
bash-4.3# ./kafka-server-stop.sh
No kafka server to stop
Run Code Online (Sandbox Code Playgroud)
我成功地能够创建一个主题
bash-4.3# $KAFKA_HOME/bin/kafka-topics.sh --create --topic topic1 \
> --partitions 4 --zookeeper $ZK --replication-factor 2
Created topic "topic1".
Run Code Online (Sandbox Code Playgroud)
如何重新启动 kafka 以实现 server.properties 更改。
azureservicebus apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams
我为本地运行设置了 Kafka。我已经用 Java 编写了示例生产者和消费者,并通过启动服务器和动物园管理员从本地运行。
我想使用oracle作为生产者,这需要编写配置文件(已经编写),confluent shell script才能在Unix上运行它。
有什么办法可以confluent在 Windows上运行,我confluent在安装程序中找不到批处理文件?
另外,有没有办法在不使用confluent脚本的情况下以生产者身份运行 Oracle ?
java apache-kafka kafka-producer-api apache-kafka-connect confluent-platform
我正在使用 Apache Kafka 版本 (2.1.0) 开发流 API。我有一个 Kafka 集群和一个外部服务器。外部服务器将生成供 Kafka 集群使用的数据。
我们将外部服务器表示为E,将集群表示为C。E没有安装Kafka。我在它上面运行一个 JAR 文件来生成消息。以下是 Producer 属性的片段 :
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrapIp:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer");
Run Code Online (Sandbox Code Playgroud)
我设置bootstrapIp为 Kafka 代理 IP。
在集群端,我使用以下命令启动消费者控制台:
kafka-console-consumer --bootstrap-server bootstrapIp:9092 --topic T1 --from-beginning
Run Code Online (Sandbox Code Playgroud)
我设置bootstrapIp为集群引导服务器IP。
当在集群上运行生产者和消费者时,它工作得很好,但是当我在外部服务器(E )中运行生产者和在集群( C )中运行消费者时,数据没有被消耗。
在本地主机中,当我在集群( C )中运行生产者和消费者时,一切都工作正常,一切都工作正常,当在外部运行生产者时,我无法使用集群中的数据。
从集群 ( C ) 到外部服务器 ( E ) 的ping工作正常,但我看不出问题到底出在哪里。
我无法弄清楚如何使用来自外部服务器的消息。
编辑
从外部服务器(E)我telnet(bootstrapIp):telnet bootstrapIp 9092并且它有效,我不明白这个问题