我是Kafka的新手,我们的团队正在研究服务间通信的模式。
目标
我们有两个服务,P(生产者)和C(消费者)。对于C所需的一组数据,P是真理的源头。C启动时,需要将所有当前数据从P加载到其缓存中,然后订阅更改通知。(换句话说,我们要在服务之间同步数据。)
数据总量相对较低,并且更改很少。短暂的同步延迟是可以接受的(最终一致性)。
我们希望解耦服务,以便P和C不需要彼此了解。
提案
当P启动时,它将所有数据发布到启用了日志压缩的Kafka主题。每个消息都是其ID为键的聚合。
C启动时,它将从主题的开头读取所有消息,并填充其缓存。然后,它继续从其偏移量读取数据,以通知更新。
当P更新其数据时,它将为已更改的聚合发布一条消息。(此消息与原始消息具有相同的架构。)
C收到新消息时,将更新其缓存中的相应数据。
约束条件
我们正在使用Confluent REST代理与Kafka进行通信。
问题
当C启动时,如何知道何时从该主题读取了所有消息,以便可以安全地开始处理?
如果C没有立即注意到P第二秒钟发送的消息,这是可以接受的。如果C在消费一个小时前发送的消息之前开始处理,这是不可接受的。请注意,我们不知道何时更新P的数据。
我们不希望C在消耗每条消息之后不必等待REST代理的轮询间隔。
我有一个现有的 Kafka 集群。我想安装 Kafka REST 代理:
https://github.com/confluentinc/kafka-rest
如果我安装 confluent 会随 Kafka 一起出现吗?我担心如果我仍然在我的主 Kafka 节点上使用 confluent 会覆盖我的所有设置并弄乱我的 Kafka 集群。
当您有一个现有的 Kafka 集群时,如何安装 Kafka REST?这在他们的网站上没有明确说明。我有 CentOS 并打算尝试:
sudo yum install confluent-platform-oss-2.11
Run Code Online (Sandbox Code Playgroud)
任何帮助都会很棒......
我正在尝试使用一些自定义标头向 Kafka 发送消息,但我找不到方法。根据文档,POST /topics/(string: topic_name)API 无法将自定义标头附加到消息。
有谁知道如何通过融合的 Kafka REST API 发送带有自定义标头的 Kafka 消息?
这是我发送到 REST 代理的示例请求正文
POST /topics/stream.mycustomtopic
{
"records": [{
"value": "{\"myFirstKey\":\"myFirstValue\"}"
}]
}
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 kafka-rest 接口将 Protobuf 架构发布到架构注册表:
curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"value_schema": "syntax=\"proto3\"; message User { string name = 1; }", "records": [{"value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/protobuftest"
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
{"error_code":415,"message":"HTTP 415 Unsupported Media Type"}
Run Code Online (Sandbox Code Playgroud)
问题:指示媒体类型使其发挥作用的正确方法是什么?
protocol-buffers apache-kafka confluent-schema-registry kafka-rest confluent-platform
我想在 Kafka 之上使用 HTTP 代理。我看到两个具有相同目的的项目:
我使用 strimzi 运算符在 Kubernetes 上启动 Kafka。
它们有何不同?什么时候使用哪一个?
很好奇Confluent的Kafka REST Proxy和用kafka官方客户端库实现的生产者/消费者的优缺点。我知道 Confluent 的 Kafka REST 代理用于管理任务和 kafka 客户端不支持的语言。
那么,kafka客户端有哪些优势呢?
我不知道Kafka REST Proxy API的优点。它是一个 REST API,所以我知道它对于管理来说很方便。人们为什么使用 Kafka REST 代理 API?添加对生产者或消费者的 Maven 依赖是否很麻烦?
另外,我知道kafka客户端有更好的性能。
我正在使用这里的汇合 cp-all-in-one 项目配置:https://github.com/confluenceinc/cp-docker-images/blob/5.2.2-post/examples/cp-all-in-one /docker-compose.yml
http://localhost:8082/topics/zuum-positions
我正在使用以下 AVRO 正文发布一条消息:
{
"key_schema": "{\"type\":\"string\"}",
"value_schema":"{ \"type\":\"record\",\"name\":\"Position\",\"fields\":[ { \"name\":\"loadId\",\"type\":\"double\"},{\"name\":\"lat\",\"type\":\"double\"},{ \"name\":\"lon\",\"type\":\"double\"}]}",
"records":[
{
"key":"22",
"value":{
"lat":43.33,
"lon":43.33,
"loadId":22
}
}
]
}
Run Code Online (Sandbox Code Playgroud)
我已将以下标头正确添加到上述 POST 请求中:
Content-Type: application/vnd.kafka.avro.v2+json
Accept: application/vnd.kafka.v2+json
执行此请求时,我在 docker 日志中看到以下异常:
Error encountered in task zuum-sink-positions-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where consumed record is {topic='zuum-positions', partition=0, offset=25, timestamp=1563480487456, timestampType=CreateTime}. org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic zuum-positions to Avro:
connect | at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) …Run Code Online (Sandbox Code Playgroud) apache-kafka docker confluent-schema-registry kafka-rest confluent-platform
Zookeeper 和 Kafka 沟通良好并且运行良好。
不知道为什么模式注册表和 Kafka 休息无法启动。
下面是 docker-compose 文件。
这是架构和其余 docker-compose 文件。这个泊坞窗上的错误
[main] ERROR io.confluent.admin.utils.cli.KafkaReadyCommand - Error while running kafka-ready.
java.lang.RuntimeException: No endpoints found for security protocol [PLAINTEXT]. Endpoints found in ZK [{EXTERNAL=localhost:9092, INTERNAL=kafka:29092}]
at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:143)
[main] INFO org.apache.zookeeper.ZooKeeper - Session: 0x1003f4a8fa10006 closed
[main] ERROR io.confluent.admin.utils.cli.KafkaReadyCommand - Error while running kafka-ready.
java.lang.RuntimeException: No endpoints found for security protocol [PLAINTEXT]. Endpoints found in ZK [{EXTERNAL=localhost:9092, INTERNAL=kafka:29092}]
at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:143)
Run Code Online (Sandbox Code Playgroud)
码头工人组成:
schema-registry:
network_mode: pm
image: confluentinc/cp-schema-registry:5.2.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- …Run Code Online (Sandbox Code Playgroud) apache-kafka docker apache-zookeeper confluent-schema-registry kafka-rest