我有以下路线。奇怪的是,自定义标头值“myHeader”在 Kafka 的消费者端丢失了。
您能否让我知道这是什么原因以及如何解决这个问题。
from("file://inputFolder?delay=2s&noop=true")
.convertBodyTo(String.class)
.setHeader("myHeader", constant("MY_HEADER_VALUE"))
.to("kafka:test-topic?brokers=localhost:9092");
from("kafka:test-topic?brokers=localhost:9092")
.log("${body}")
.log("***** myHeader: ${header.myHeader}")
Run Code Online (Sandbox Code Playgroud)
myHeader 的值是一个空字符串,尽管我将其设置为“”MY_HEADER_VALUE”。
我们使用 Apache Camel 2.20.2 和 Spring Boot 版本 1.5.10.RELEASE。
我正在运行 kafka kubenetes helm 部署,但是我不确定如何安装自定义插件。
在本地版本的 kafka 上运行自定义插件时,我将卷安装/myplugin
到 Docker 映像,然后设置插件路径环境变量。
我不确定如何将此工作流程应用于 helm Charts/kubernetes 部署,主要是如何将插件安装到 Kafka Connect pod,以便可以在 default 中找到它plugin.path=/usr/share/java
。
我试图更好地了解如何设置集群来运行 Kafka-Stream 应用程序。我试图更好地了解所涉及的数据量。
在这方面,虽然我可以很快看到 KTable 需要状态存储,但我想知道从主题创建 Kstream 是否立即意味着将该主题的所有日志复制到状态存储中,显然是以我认为的仅附加方式。也就是说,特别是如果我们想公开查询流?
当数据是 Kstream 时,当数据在源主题中移动时,Kafka 是否会自动复制状态存储中的数据?如上所述,由于更新,这对于 Ktable 来说听起来很明显,但对于 Kstream 我只想确认会发生什么。
嗨,我正在使用 Confluence kafka。我有返回通用记录的消费者。我想反序列化它。我找不到任何办法。我可以手动完成每个字段,例如
object options = ((GenericRecord)response.Message.Value["Product"])["Options"];
Run Code Online (Sandbox Code Playgroud)
我在这里找到了一个
使用 C# 反序列化 Avro 文件 但是如何将架构转换为流?我想知道我们是否可以使用任何解决方案反序列化到我们的 c# 模型中?任何帮助将不胜感激。谢谢。
c# avro apache-kafka confluent-schema-registry confluent-platform
我无法弄清楚如何测试使用 Avro 作为消息格式和(融合)架构注册表的 Spring Cloud Stream Kafka Streams 应用程序。
配置可能是这样的:
spring:
application:
name: shipping-service
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8081
kafka:
streams:
binder:
configuration:
application:
id: shipping-service
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
schema:
registry:
url: ${spring.cloud.stream.schema-registry-client.endpoint}
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
bindings:
input:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
order:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
output:
producer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
input:
destination: customer
order:
destination: order
output:
destination: order
server:
port: 8086
logging:
level:
org.springframework.kafka.config: debug
Run Code Online (Sandbox Code Playgroud)
笔记:
我想关于 Kafka Broker,我应该使用 EmbeddedKafkaBroker bean,但如您所见,它还依赖于应该以某种方式模拟的模式注册表。如何?
apache-kafka spring-cloud spring-cloud-stream apache-kafka-streams spring-kafka
我对卡夫卡经纪人很困惑。我认为它们将消息(数据,记录)存储为二进制格式,例如 0100110111...(有些人可能称它们为字节数组,字节流,字节数组等)
当我阅读有关 Kafka Connect 的内容时,它说消息存储在 Kafka 中的 Avro 对象、JSON 对象或字符串中。
卡夫卡权威指南:
...然后,工作人员使用配置的转换器将记录转换为 Avro 对象、JSON 对象或字符串,然后将结果存储到 Kafka 中。...当 Connect Worker 从 Kafka 读取记录时,它使用配置的转换器将记录从 Kafka 中的格式(即 Avro、JSON 或字符串)转换为 Connect Data API 记录,然后将其传递到接收器连接器,将其插入到目标系统中。
Kafka 代理既可以存储二进制数据,也可以存储 Avro、JSON 和字符串?
或者JSON、Avro、String都是二进制数据?
(我了解生产者/消费者的序列化/反序列化。我的问题仅涉及 Broker 的角度。)
我正在使用 Couchbase 水槽连接器。CB 和 kafka 位于不同 AWS 区域的 2 个不同 EC2 实例中。我正在关注这些文档:
基于这些,我认为 connect 必须在也安装了 kafka 的主机上运行。我的连接是否可以在远程主机上运行,以便我从远程 kafka 读取并将消息接收到远程 CB 存储桶中?有专门针对此的文档吗?
另外,我收到以下错误:
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches com.couchbase.connect.kafka.CouchbaseSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.3.0', encodedVersion=2.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.3.0', encodedVersion=2.3.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.3.0', encodedVersion=2.3.0, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.3.0', encodedVersion=2.3.0, type=sink, typeName='sink', …
Run Code Online (Sandbox Code Playgroud) 当我在本地运行时,我可以看到kafka.consumer.
正在收集这些信息。当我部署服务时,我发现这些指标不存在。
我使用kafka版本1.11.0
,java 11和Spring Boot 2.2。
我如何确定缺少什么?
apache-kafka spring-boot spring-kafka micrometer spring-micrometer
尝试在 Google Cloud (GCP) Pub/Sub 与 Manager Kafka Service 之间做出选择。
在最新的更新中,Pub/Sub 添加了对重放之前处理过的消息的支持,这是一个值得欢迎的变化。
我在他们的文档中找不到的一个功能是,我们是否可以拥有类似于 Kafka 消费者组的功能,即拥有一组订阅者,每个订阅者处理来自同一主题的数据,并且能够从头开始重新处理数据订阅者(消费群体)而其他人则不受其影响。例如:
假设您有一个主题 StockTicks
你有两个消费者群体
CG1:有两个消费者
CG2:有另外两个消费者
在 Kafka 中,我可以独立读取这些组之间的消息,但我可以使用 Pub/Sub 做同样的事情吗?
而且Kafka允许你从头开始重放消息,我可以对Pub/Sub做同样的事情吗?如果我不能重放CG创建之前发布的消息,我可以,但是我可以重放CG创建之后提交的消息吗? CG/订阅者已创建?
apache-kafka google-cloud-platform google-cloud-pubsub kafka-consumer-api
我尝试过shopify/sarama库来使用kafka消息。Consumer
接口和接口我都用过ConsumerGroup
。我可以使用ConsumePartition()
中的方法从特定分区消费Consumer
。但是当我使用ConsumerGroup
接口时,我似乎没有能力从特定分区消费。
有没有办法让我将某些分区分配给消费者组内的特定消费者?或者这是我无法干涉的事情?
apache-kafka ×10
spring-kafka ×2
apache-camel ×1
avro ×1
c# ×1
couchbase ×1
go ×1
ksqldb ×1
kubernetes ×1
micrometer ×1
sarama ×1
spring-boot ×1
spring-cloud ×1