标签: apache-kafka

向 Kafka 发送消息时,Camel 中的自定义标头丢失

我有以下路线。奇怪的是,自定义标头值“myHeader”在 Kafka 的消费者端丢失了。

您能否让我知道这是什么原因以及如何解决这个问题。

from("file://inputFolder?delay=2s&noop=true")
.convertBodyTo(String.class)
.setHeader("myHeader", constant("MY_HEADER_VALUE"))
.to("kafka:test-topic?brokers=localhost:9092");

from("kafka:test-topic?brokers=localhost:9092")
.log("${body}")
.log("***** myHeader: ${header.myHeader}")
Run Code Online (Sandbox Code Playgroud)

myHeader 的值是一个空字符串,尽管我将其设置为“”MY_HEADER_VALUE”。

我们使用 Apache Camel 2.20.2 和 Spring Boot 版本 1.5.10.RELEASE。

apache-camel apache-kafka

1
推荐指数
1
解决办法
2481
查看次数

在 Kubernetes 上安装自定义连接器到 Kafka Connect

我正在运行 kafka kubenetes helm 部署,但是我不确定如何安装自定义插件。

在本地版本的 kafka 上运行自定义插件时,我将卷安装/myplugin到 Docker 映像,然后设置插件路径环境变量。

我不确定如何将此工作流程应用于 helm Charts/kubernetes 部署,主要是如何将插件安装到 Kafka Connect pod,以便可以在 default 中找到它plugin.path=/usr/share/java

apache-kafka kubernetes apache-kafka-connect

1
推荐指数
1
解决办法
2481
查看次数

Kstream 的成本与 KTable 相对于状态存储的成本

我试图更好地了解如何设置集群来运行 Kafka-Stream 应用程序。我试图更好地了解所涉及的数据量。

在这方面,虽然我可以很快看到 KTable 需要状态存储,但我想知道从主题创建 Kstream 是否立即意味着将该主题的所有日志复制到状态存储中,显然是以我认为的仅附加方式。也就是说,特别是如果我们想公开查询流?

当数据是 Kstream 时,当数据在源主题中移动时,Kafka 是否会自动复制状态存储中的数据?如上所述,由于更新,这对于 Ktable 来说听起来很明显,但对于 Kstream 我只想确认会发生什么。

apache-kafka apache-kafka-streams ksqldb

1
推荐指数
1
解决办法
943
查看次数

如何使用 C# 反序列化 Kafka 中的 Avro 消息

嗨,我正在使用 Confluence kafka。我有返回通用记录的消费者。我想反序列化它。我找不到任何办法。我可以手动完成每个字段,例如

 object options = ((GenericRecord)response.Message.Value["Product"])["Options"];
Run Code Online (Sandbox Code Playgroud)

我在这里找到了一个

使用 C# 反序列化 Avro 文件 但是如何将架构转换为流?我想知道我们是否可以使用任何解决方案反序列化到我们的 c# 模型中?任何帮助将不胜感激。谢谢。

c# avro apache-kafka confluent-schema-registry confluent-platform

1
推荐指数
1
解决办法
9914
查看次数

如何测试使用 Avro 和 Confluence Schema Registry 的 Spring Cloud Stream Kafka Streams 应用程序?

我无法弄清楚如何测试使用 Avro 作为消息格式和(融合)架构注册表的 Spring Cloud Stream Kafka Streams 应用程序。

配置可能是这样的:

spring:
  application:
    name: shipping-service
  cloud:
    stream:
      schema-registry-client:
        endpoint: http://localhost:8081
      kafka:
        streams:
          binder:
            configuration:
              application:
                id: shipping-service
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
              schema:
                registry:
                  url: ${spring.cloud.stream.schema-registry-client.endpoint}
              value:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
          bindings:
            input:
              consumer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            order:
              consumer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
            output:
              producer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
      bindings:
        input:
          destination: customer
        order:
          destination: order
        output:
          destination: order

server:
  port: 8086

logging:
  level:
    org.springframework.kafka.config: debug
Run Code Online (Sandbox Code Playgroud)

笔记:

  • 它使用本机序列化/反序列化。
  • 测试框架:Junit 5

我想关于 Kafka Broker,我应该使用 EmbeddedKafkaBroker bean,但如您所见,它还依赖于应该以某种方式模拟的模式注册表。如何?

apache-kafka spring-cloud spring-cloud-stream apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
2098
查看次数

Kafka Broker 不仅可以存储二进制格式的数据,还可以存储 Avro、JSON 和字符串数据吗?

我对卡夫卡经纪人很困惑。我认为它们将消息(数据,记录)存储为二进制格式,例如 0100110111...(有些人可能称它们为字节数组,字节流,字节数组等)

当我阅读有关 Kafka Connect 的内容时,它说消息存储在 Kafka 中的 Avro 对象、JSON 对象或字符串中。

卡夫卡权威指南:

...然后,工作人员使用配置的转换器将记录转换为 Avro 对象、JSON 对象或字符串,然后将结果存储到 Kafka 中。...当 Connect Worker 从 Kafka 读取记录时,它使用配置的转换器将记录从 Kafka 中的格式(即 Avro、JSON 或字符串)转换为 Connect Data API 记录,然后将其传递到接收器连接器,将其插入到目标系统中。

Kafka 代理既可以存储二进制数据,也可以存储 Avro、JSON 和字符串?

或者JSON、Avro、String都是二进制数据?

(我了解生产者/消费者的序列化/反序列化。我的问题仅涉及 Broker 的角度。)

apache-kafka

1
推荐指数
1
解决办法
4021
查看次数

kafka connect 和 kafka 主机要求

我正在使用 Couchbase 水槽连接器。CB 和 kafka 位于不同 AWS 区域的 2 个不同 EC2 实例中。我正在关注这些文档:

  1. https://docs.couchbase.com/kafka-connector/current/quickstart.html
  2. https://kafka.apache.org/documentation/#connect_configuring

基于这些,我认为 connect 必须在也安装了 kafka 的主机上运行。我的连接是否可以在远程主机上运行,​​以便我从远程 kafka 读取并将消息接收到远程 CB 存储桶中?有专门针对此的文档吗?

另外,我收到以下错误:

        at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
    at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches com.couchbase.connect.kafka.CouchbaseSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.3.0', encodedVersion=2.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.3.0', encodedVersion=2.3.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.3.0', encodedVersion=2.3.0, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.3.0', encodedVersion=2.3.0, type=sink, typeName='sink', …
Run Code Online (Sandbox Code Playgroud)

couchbase apache-kafka apache-kafka-connect

1
推荐指数
1
解决办法
479
查看次数

Micrometer KafkaConsumerMetrics 在本地运行时存在,但在部署时不存在

当我在本地运行时,我可以看到kafka.consumer.正在收集这些信息。当我部署服务时,我发现这些指标不存在。

我使用kafka版本1.11.0,java 11和Spring Boot 2.2。

我如何确定缺少什么?

apache-kafka spring-boot spring-kafka micrometer spring-micrometer

1
推荐指数
1
解决办法
526
查看次数

Google Cloud (GCP) Pub/Sub 是否支持与 Kafka 中的 ConsumerGroups 类似的功能

尝试在 Google Cloud (GCP) Pub/Sub 与 Manager Kafka Service 之间做出选择。

在最新的更新中,Pub/Sub 添加了对重放之前处理过的消息的支持,这是一个值得欢迎的变化。

我在他们的文档中找不到的一个功能是,我们是否可以拥有类似于 Kafka 消费者组的功能,即拥有一组订阅者,每个订阅者处理来自同一主题的数据,并且能够从头开始重新处理数据订阅者(消费群体)而其他人则不受其影响。例如:

假设您有一个主题 StockTicks

你有两个消费者群体

CG1:有两个消费者
CG2:有另外两个消费者

在 Kafka 中,我可以独立读取这些组之间的消息,但我可以使用 Pub/Sub 做同样的事情吗?

而且Kafka允许你从头开始重放消息,我可以对Pub/Sub做同样的事情吗?如果我不能重放CG创建之前发布的消息,我可以,但是我可以重放CG创建之后提交的消息吗? CG/订阅者已创建?

apache-kafka google-cloud-platform google-cloud-pubsub kafka-consumer-api

1
推荐指数
1
解决办法
3893
查看次数

Sarama 使用 ConsumerGroup 分区消费者

我尝试过shopify/sarama库来使用kafka消息。Consumer接口和接口我都用过ConsumerGroup。我可以使用ConsumePartition()中的方法从特定分区消费Consumer。但是当我使用ConsumerGroup接口时,我似乎没有能力从特定分区消费。

有没有办法让我将某些分区分配给消费者组内的特定消费者?或者这是我无法干涉的事情?

go apache-kafka sarama

1
推荐指数
1
解决办法
3743
查看次数