标签: apache-kafka

Kafka CLI 工具在 wurstmeister/kafka 中的位置

我已经将 kafka 部署在 docker 容器中。


version: '3.8'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
Run Code Online (Sandbox Code Playgroud)

我在其中一个视频中看到 kafka 工具包存在于沿途的发行版中

/usr/bin

kafka 工具包不在该图像中的该位置

也许这个集合可以以某种方式安装,或者它是否存在于任何其他 kafka 映像中?

apache-kafka docker

1
推荐指数
1
解决办法
1415
查看次数

Helidon MP:如何将 slf4j 日志消息发送到 Kafka 代理?

我正在尝试将 Helidon MP 应用程序中的 slf4j 日志消息发送到在端口 9092 上运行的 Kafka 服务器。我有以下类作为示例:

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Service {

  private final ConfigProvider configProvider;

  @Inject
  public Service(ConfigProvider configProvider) {
    this.configProvider = configProvider;
  }

  public String getString() {
    String msg = String.format("%s %s !", configProvider.getString());
    log.info("Entered getString() method");
    return msg;
  }
}
Run Code Online (Sandbox Code Playgroud)

我还有一个logging.xml文件,它将Appender指定为KafkaAppender:

<Configuration>
    <Appenders>
        <Kafka name="KafkaAppender" topic="app-logs"
               syncSend="false">
            <Property name="bootstrap.servers"
                      value="localhost:9092"/>
        </Kafka>
    </Appenders>
    <Loggers>
        <Logger name="org.apache.kafka" level="WARN"/> <!-- avoid recursive logging -->
        <Root level="INFO">
            <AppenderRef ref="KafkaAppender"/>
        </Root>
    </Loggers>
</Configuration>
Run Code Online (Sandbox Code Playgroud)

但是,当我运行该应用程序时,出现以下错误:

2022-11-28 14:23:17,358 main …
Run Code Online (Sandbox Code Playgroud)

java slf4j apache-kafka log4j2 helidon

1
推荐指数
1
解决办法
477
查看次数

Kafka 不向其他分区发送消息

Apache Kafka 安装在 Mac(英特尔)上。单一本地生产者和单一本地消费者。创建了 1 个具有 3 个分区和 1 个复制因子的主题:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic animal --partitions 3 --replication-factor 1
Run Code Online (Sandbox Code Playgroud)

生产者代码:

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic animal
Run Code Online (Sandbox Code Playgroud)

制作人留言:

>alligator
>crocodile
>tiger
Run Code Online (Sandbox Code Playgroud)

生成消息时(通过生产者控制台手动),所有消息都会进入同一个分区。它们不应该跨分区分布吗?

我尝试过 3 条记录(如上所述),但它们仅发送到 1 个分区。在 tmp/kafka-logs/topic-0/00** 00.log 中检查 topic- 中的其他日志为空。

我尝试过几十条记录,但没有成功。

我什至在“config/server.properties”中增加了默认分区配置(num.partitions=3),但没有成功。

我也尝试过不同的主题,但没有运气。

apache-kafka kafka-producer-api kafka-topic kafka-partition

1
推荐指数
1
解决办法
1182
查看次数

SpringBoot 与 REACTOR kafka :增加 2CPUs pod 上的消息消耗吞吐量

关于带有 Reactor kafka 的 SpringBoot 3 应用程序的小问题。

我有一个小型反应式 kafka 消费者应用程序,它消耗来自 kafka 的消息并处理该消息。

该应用程序正在使用一个the-topic具有三个分区的主题。

该应用程序是docker化的,并且由于资源消耗限制的原因,该应用程序只能使用2个CPU(请耐心等待)。为了让事情变得更加困难,我只能运行该应用程序的一个唯一实例

该应用程序非常简单:

     <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.kafka</groupId>
            <artifactId>reactor-kafka</artifactId>
        </dependency>
    </dependencies>
Run Code Online (Sandbox Code Playgroud)
@Configuration
public class MyKafkaConfiguration {

    @Bean
    public KafkaReceiver<String, String> reactiveKafkaConsumerTemplate(KafkaProperties kafkaProperties) {
        kafkaProperties.setBootstrapServers(List.of("my-kafka.com:9092"));
        kafkaProperties.getConsumer().setGroupId("should-i-do-something-here");
        final ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        basicReceiverOptions.subscription(Collections.singletonList("the-topic"));
        return new DefaultKafkaReceiver<>(ConsumerFactory.INSTANCE, basicReceiverOptions);
    }

}
Run Code Online (Sandbox Code Playgroud)
@Service
public class MyConsumer implements CommandLineRunner {

    @Autowired
    private KafkaReceiver<String, String> kafkaReceiver;


    @Override
    public void run(String... args) {
        myConsumer().subscribe();
    }

    public Flux<String> …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka reactor-kafka

1
推荐指数
1
解决办法
2159
查看次数

Testcontainers SchemaRegistry 无法连接到 Kafka 容器

我想运行集成测试来测试我的 kafka 监听器和 avro 序列化。这需要一个 Kafka 和一个 Schema 注册表(也可以称为 Zookeeper)。

测试时,我当前必须使用 docker-compose.yml,但我想通过 testcontainers 构建所需的容器来减少用户错误。Kafka 和 Zookeeper 实例启动得很好,看起来工作得很好——我的应用程序可以创建所需的主题,并且监听器也被订阅,我什至可以通过 kafka 控制台生产者发送消息。

不起作用的是 SchemaRegistry。容器启动,显然连接到了ZK,但无法建立到broker的连接。它会重试连接一段时间,直到超时,然后容器停止。因此,我无法在测试中注册和读取用于(反)序列化的 avro 模式,因此失败。

我找不到 SR 显然可以连接到 ZK 但找不到我的经纪人的原因。

有人也遇到过这个问题吗?你成功运行了吗?如果是这样,怎么会这样?我需要 Kafka 和架构注册表测试容器完全可用于我的测试,因此不能忽略其中任何一个。

我也可以继续使用 docker-compose.yml,但我真的很想完全以编程方式设置我的测试环境。

架构注册表容器记录以下内容:

2023-02-08 16:56:09 [2023-02-08 15:56:09,556] INFO Session establishment complete on server zookeeper/192.168.144.2:2181, session id = 0x1000085b81e0003, negotiated timeout = 40000 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO Session: 0x1000085b81e0003 closed (org.apache.zookeeper.ZooKeeper)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO EventThread shut down for session: 0x1000085b81e0003 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,787] INFO …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka docker-compose testcontainers confluent-schema-registry

1
推荐指数
1
解决办法
3894
查看次数

什么是kafka中的消费者组?

什么是kafka中的消费者组?请用一个用例来解释。

apache-kafka

0
推荐指数
1
解决办法
492
查看次数

如何将自定义标头值添加到 spring kafka 消息?

我是 springboot kafka 的新手,我在这篇文章之后创建了一个示例。

https://www.codenotfound.com/spring-kafka-boot-example.html

我目前使用的是 spring.kafka.version 1.1.6

我想在消息中添加自定义标题,以便我可以在标题中发送某些属性,例如:fileName、fileId

我发现您可以将 kafka 元数据设置为标题,但这不符合我的目的。

无论如何我可以做到这一点吗?如果可能的话,我很感激你能分享一个例子。

java apache-kafka spring-boot spring-kafka

0
推荐指数
1
解决办法
2997
查看次数

KafkaStreams 同一应用程序中的多个流

我正在尝试根据 KafkaStreams 的惯例和合理性做出实际的设计决策。

假设我有两个不同的事件要放入KTables 中。我有一个生产者将这些消息发送给KStream正在收听该主题的生产者。

据我所知,我不能对使用 的消息使用条件转发KafkaStreams,因此如果流订阅了许多主题(例如,上述每个消息一个),我只能调用stream.to一个接收器主题 - 否则,我会做一些事情,比如foreach在流上调用并将带有 a 的消息发送KProducer到接收器主题。

以上建议使用单个流。我以为我可以在同一个应用程序中设置多个流,每个流监听一个主题,映射并转发到一个表接收器,但是每次我尝试创建 的两个实例时KafkaStreams,只有第一个初始化订阅它的主题 - 另一个得到一个来自客户端的警告,它的主题没有订阅。

我可以在同一个应用程序中设置多个流吗?如果有,有什么特殊要求吗?

    class Stream(topic: String) {
      val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
      val streamsBuilder = new StreamsBuilder
      val topics = new util.ArrayList[String]
      topics.add(props.get("topic"))

      val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))

      def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
        builder.stream[String, String](
          topics,
          Consumed.`with`(String(), String())
        )
      }

      def init(): KafkaStreams …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

0
推荐指数
1
解决办法
5601
查看次数

无法远程使用 kafka 事件错误:connect ECONNREFUSED 5.6.7.8:9092

我按照此处提到的教程重新设置了我的 kafka 制作人:https : //www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-14-04

我正在使用 cron 和以下带有 IP 的服务器上的脚本向生产者推送一些事件:1.2.3.4

#!/usr/bin/env python
import threading, logging, time
import multiprocessing
import requests
import datetime
import json
from kafka import KafkaProducer

class CheckApis():
    apisList = {"a": "https://test.eng.com/"}
    kafkaProducer = "1.2.3.4:9092"
    kafkaTopic = "sometopic"
    producer = KafkaProducer(bootstrap_servers=kafkaProducer)
    for key, value in apisList.items():
        headers = {};
        response = requests.request("GET", value, headers=headers)
        message = {"app_name": key, "status": response.status_code, "message": "none", "timestamp": str(datetime.datetime.utcnow())}
        producer.send(kafkaTopic, json.dumps(message).encode('utf-8'));
        print (response.text)
        print (response.status_code)
    producer.close()
Run Code Online (Sandbox Code Playgroud)

这很有效,我可以使用以下命令查看推送的事件:

~/kafka/bin/kafka-console-consumer.sh --zookeeper 1.2.3.4:2181 --topic sometopic …
Run Code Online (Sandbox Code Playgroud)

python node.js apache-kafka kafka-consumer-api apache-kafka-connect

0
推荐指数
1
解决办法
1815
查看次数

如何在Nifi中查看Kafka的消费消息?

我已经启动了一个 Nifi 进程(消费 Kafka)并将其连接到一个主题。它正在运行,但我无法(不知道)在哪里可以查看消息?

apache-kafka kafka-consumer-api apache-nifi

0
推荐指数
1
解决办法
4462
查看次数