我已经将 kafka 部署在 docker 容器中。
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
Run Code Online (Sandbox Code Playgroud)
我在其中一个视频中看到 kafka 工具包存在于沿途的发行版中
/usr/bin
kafka 工具包不在该图像中的该位置
也许这个集合可以以某种方式安装,或者它是否存在于任何其他 kafka 映像中?
我正在尝试将 Helidon MP 应用程序中的 slf4j 日志消息发送到在端口 9092 上运行的 Kafka 服务器。我有以下类作为示例:
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Service {
private final ConfigProvider configProvider;
@Inject
public Service(ConfigProvider configProvider) {
this.configProvider = configProvider;
}
public String getString() {
String msg = String.format("%s %s !", configProvider.getString());
log.info("Entered getString() method");
return msg;
}
}
Run Code Online (Sandbox Code Playgroud)
我还有一个logging.xml文件,它将Appender指定为KafkaAppender:
<Configuration>
<Appenders>
<Kafka name="KafkaAppender" topic="app-logs"
syncSend="false">
<Property name="bootstrap.servers"
value="localhost:9092"/>
</Kafka>
</Appenders>
<Loggers>
<Logger name="org.apache.kafka" level="WARN"/> <!-- avoid recursive logging -->
<Root level="INFO">
<AppenderRef ref="KafkaAppender"/>
</Root>
</Loggers>
</Configuration>
Run Code Online (Sandbox Code Playgroud)
但是,当我运行该应用程序时,出现以下错误:
2022-11-28 14:23:17,358 main …Run Code Online (Sandbox Code Playgroud) Apache Kafka 安装在 Mac(英特尔)上。单一本地生产者和单一本地消费者。创建了 1 个具有 3 个分区和 1 个复制因子的主题:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic animal --partitions 3 --replication-factor 1
Run Code Online (Sandbox Code Playgroud)
生产者代码:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic animal
Run Code Online (Sandbox Code Playgroud)
制作人留言:
>alligator
>crocodile
>tiger
Run Code Online (Sandbox Code Playgroud)
生成消息时(通过生产者控制台手动),所有消息都会进入同一个分区。它们不应该跨分区分布吗?
我尝试过 3 条记录(如上所述),但它们仅发送到 1 个分区。在 tmp/kafka-logs/topic-0/00** 00.log 中检查 topic- 中的其他日志为空。
我尝试过几十条记录,但没有成功。
我什至在“config/server.properties”中增加了默认分区配置(num.partitions=3),但没有成功。
我也尝试过不同的主题,但没有运气。
关于带有 Reactor kafka 的 SpringBoot 3 应用程序的小问题。
我有一个小型反应式 kafka 消费者应用程序,它消耗来自 kafka 的消息并处理该消息。
该应用程序正在使用一个the-topic具有三个分区的主题。
该应用程序是docker化的,并且由于资源消耗限制的原因,该应用程序只能使用2个CPU(请耐心等待)。为了让事情变得更加困难,我只能运行该应用程序的一个唯一实例。
该应用程序非常简单:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
</dependencies>
Run Code Online (Sandbox Code Playgroud)
@Configuration
public class MyKafkaConfiguration {
@Bean
public KafkaReceiver<String, String> reactiveKafkaConsumerTemplate(KafkaProperties kafkaProperties) {
kafkaProperties.setBootstrapServers(List.of("my-kafka.com:9092"));
kafkaProperties.getConsumer().setGroupId("should-i-do-something-here");
final ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
basicReceiverOptions.subscription(Collections.singletonList("the-topic"));
return new DefaultKafkaReceiver<>(ConsumerFactory.INSTANCE, basicReceiverOptions);
}
}
Run Code Online (Sandbox Code Playgroud)
@Service
public class MyConsumer implements CommandLineRunner {
@Autowired
private KafkaReceiver<String, String> kafkaReceiver;
@Override
public void run(String... args) {
myConsumer().subscribe();
}
public Flux<String> …Run Code Online (Sandbox Code Playgroud) 我想运行集成测试来测试我的 kafka 监听器和 avro 序列化。这需要一个 Kafka 和一个 Schema 注册表(也可以称为 Zookeeper)。
测试时,我当前必须使用 docker-compose.yml,但我想通过 testcontainers 构建所需的容器来减少用户错误。Kafka 和 Zookeeper 实例启动得很好,看起来工作得很好——我的应用程序可以创建所需的主题,并且监听器也被订阅,我什至可以通过 kafka 控制台生产者发送消息。
不起作用的是 SchemaRegistry。容器启动,显然连接到了ZK,但无法建立到broker的连接。它会重试连接一段时间,直到超时,然后容器停止。因此,我无法在测试中注册和读取用于(反)序列化的 avro 模式,因此失败。
我找不到 SR 显然可以连接到 ZK 但找不到我的经纪人的原因。
有人也遇到过这个问题吗?你成功运行了吗?如果是这样,怎么会这样?我需要 Kafka 和架构注册表测试容器完全可用于我的测试,因此不能忽略其中任何一个。
我也可以继续使用 docker-compose.yml,但我真的很想完全以编程方式设置我的测试环境。
架构注册表容器记录以下内容:
2023-02-08 16:56:09 [2023-02-08 15:56:09,556] INFO Session establishment complete on server zookeeper/192.168.144.2:2181, session id = 0x1000085b81e0003, negotiated timeout = 40000 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO Session: 0x1000085b81e0003 closed (org.apache.zookeeper.ZooKeeper)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO EventThread shut down for session: 0x1000085b81e0003 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,787] INFO …Run Code Online (Sandbox Code Playgroud) java apache-kafka docker-compose testcontainers confluent-schema-registry
我是 springboot kafka 的新手,我在这篇文章之后创建了一个示例。
https://www.codenotfound.com/spring-kafka-boot-example.html
我目前使用的是 spring.kafka.version 1.1.6
我想在消息中添加自定义标题,以便我可以在标题中发送某些属性,例如:fileName、fileId
我发现您可以将 kafka 元数据设置为标题,但这不符合我的目的。
无论如何我可以做到这一点吗?如果可能的话,我很感激你能分享一个例子。
我正在尝试根据 KafkaStreams 的惯例和合理性做出实际的设计决策。
假设我有两个不同的事件要放入KTables 中。我有一个生产者将这些消息发送给KStream正在收听该主题的生产者。
据我所知,我不能对使用 的消息使用条件转发KafkaStreams,因此如果流订阅了许多主题(例如,上述每个消息一个),我只能调用stream.to一个接收器主题 - 否则,我会做一些事情,比如foreach在流上调用并将带有 a 的消息发送KProducer到接收器主题。
以上建议使用单个流。我以为我可以在同一个应用程序中设置多个流,每个流监听一个主题,映射并转发到一个表接收器,但是每次我尝试创建 的两个实例时KafkaStreams,只有第一个初始化订阅它的主题 - 另一个得到一个来自客户端的警告,它的主题没有订阅。
我可以在同一个应用程序中设置多个流吗?如果有,有什么特殊要求吗?
class Stream(topic: String) {
val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
val streamsBuilder = new StreamsBuilder
val topics = new util.ArrayList[String]
topics.add(props.get("topic"))
val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))
def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
builder.stream[String, String](
topics,
Consumed.`with`(String(), String())
)
}
def init(): KafkaStreams …Run Code Online (Sandbox Code Playgroud) 我按照此处提到的教程重新设置了我的 kafka 制作人:https : //www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-14-04
我正在使用 cron 和以下带有 IP 的服务器上的脚本向生产者推送一些事件:1.2.3.4
#!/usr/bin/env python
import threading, logging, time
import multiprocessing
import requests
import datetime
import json
from kafka import KafkaProducer
class CheckApis():
apisList = {"a": "https://test.eng.com/"}
kafkaProducer = "1.2.3.4:9092"
kafkaTopic = "sometopic"
producer = KafkaProducer(bootstrap_servers=kafkaProducer)
for key, value in apisList.items():
headers = {};
response = requests.request("GET", value, headers=headers)
message = {"app_name": key, "status": response.status_code, "message": "none", "timestamp": str(datetime.datetime.utcnow())}
producer.send(kafkaTopic, json.dumps(message).encode('utf-8'));
print (response.text)
print (response.status_code)
producer.close()
Run Code Online (Sandbox Code Playgroud)
这很有效,我可以使用以下命令查看推送的事件:
~/kafka/bin/kafka-console-consumer.sh --zookeeper 1.2.3.4:2181 --topic sometopic …Run Code Online (Sandbox Code Playgroud) python node.js apache-kafka kafka-consumer-api apache-kafka-connect
我已经启动了一个 Nifi 进程(消费 Kafka)并将其连接到一个主题。它正在运行,但我无法(不知道)在哪里可以查看消息?
apache-kafka ×10
java ×4
spring-boot ×2
spring-kafka ×2
apache-nifi ×1
docker ×1
helidon ×1
kafka-topic ×1
log4j2 ×1
node.js ×1
python ×1
slf4j ×1