标签: kafka-rest

确保已使用REST代理从Kafka主题读取了所有消息

我是Kafka的新手,我们的团队正在研究服务间通信的模式。

目标

我们有两个服务,P(生产者)和C(消费者)。对于C所需的一组数据,P是真理的源头。C启动时,需要将所有当前数据从P加载到其缓存中,然后订阅更改通知。(换句话说,我们要在服务之间同步数据。)

数据总量相对较低,并且更改很少。短暂的同步延迟是可以接受的(最终一致性)。

我们希望解耦服务,以便P和C不需要彼此了解。

提案

当P启动时,它将所有数据发布到启用了日志压缩的Kafka主题。每个消息都是其ID为键的聚合

C启动时,它将从主题的开头读取所有消息,并填充其缓存。然后,它继续从其偏移量读取数据,以通知更新。

当P更新其数据时,它将为已更改的聚合发布一条消息。(此消息与原始消息具有相同的架构。)

C收到新消息时,将更新其缓存中的相应数据。

在此处输入图片说明

约束条件

我们正在使用Confluent REST代理与Kafka进行通信。

问题

当C启动时,如何知道何时从该主题读取了所有消息,以便可以安全地开始处理?

如果C没有立即注意到P第二秒钟发送的消息,这是可以接受的。如果C在消费一个小时前发送的消息之前开始处理,这是不可接受的。请注意,我们不知道何时更新P的数据。

我们不希望C在消耗每条消息之后不必等待REST代理的轮询间隔。

apache-kafka microservices kafka-rest

7
推荐指数
1
解决办法
190
查看次数

Kafka Confluent REST API:包括 Kafka?

我有一个现有的 Kafka 集群。我想安装 Kafka REST 代理:

https://github.com/confluentinc/kafka-rest

如果我安装 confluent 会随 Kafka 一起出现吗?我担心如果我仍然在我的主 Kafka 节点上使用 confluent 会覆盖我的所有设置并弄乱我的 Kafka 集群。

当您有一个现有的 Kafka 集群时,如何安装 Kafka REST?这在他们的网站上没有明确说明。我有 CentOS 并打算尝试:

sudo yum install confluent-platform-oss-2.11
Run Code Online (Sandbox Code Playgroud)

任何帮助都会很棒......

apache-kafka kafka-rest confluent-platform

5
推荐指数
1
解决办法
353
查看次数

有没有办法使用 Kafka Confluence REST API 生成带有标头的 Kafka 消息?

我正在尝试使用一些自定义标头向 Kafka 发送消息,但我找不到方法。根据文档POST /topics/(string: topic_name)API 无法将自定义标头附加到消息。

有谁知道如何通过融合的 Kafka REST API 发送带有自定义标头的 Kafka 消息?

这是我发送到 REST 代理的示例请求正文

POST /topics/stream.mycustomtopic

{
    "records": [{
        "value": "{\"myFirstKey\":\"myFirstValue\"}"
    }]
}
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-rest confluent-platform

5
推荐指数
1
解决办法
4723
查看次数

如何使用架构注册表/Kafka-Rest 正确注册 Protobuf 架构

我正在尝试使用 kafka-rest 接口将 Protobuf 架构发布到架构注册表:

curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
   -H "Accept: application/vnd.kafka.v2+json" \
   --data '{"value_schema": "syntax=\"proto3\"; message User { string name = 1; }", "records": [{"value": {"name": "testUser"}}]}' \
   "http://localhost:8082/topics/protobuftest"
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

{"error_code":415,"message":"HTTP 415 Unsupported Media Type"}
Run Code Online (Sandbox Code Playgroud)

问题:指示媒体类型使其发挥作用的正确方法是什么?

protocol-buffers apache-kafka confluent-schema-registry kafka-rest confluent-platform

5
推荐指数
1
解决办法
998
查看次数

Confluence kafka-rest 和 Strimzi Kafka Bridge 有什么区别

我想在 Kafka 之上使用 HTTP 代理。我看到两个具有相同目的的项目:

我使用 strimzi 运算符在 Kubernetes 上启动 Kafka。

  • 两者都是开源的
  • 两者都可以用于商业自托管云应用程序。
  • 两者都在 Kafka 之上提供 REST 代理

它们有何不同?什么时候使用哪一个?

apache-kafka kafka-rest strimzi

4
推荐指数
1
解决办法
905
查看次数

Confluent 的 Kafka REST 代理与 Kafka 客户端

很好奇Confluent的Kafka REST Proxy和用kafka官方客户端库实现的生产者/消费者的优缺点。我知道 Confluent 的 Kafka REST 代理用于管理任务和 kafka 客户端不支持的语言。

那么,kafka客户端有哪些优势呢?

apache-kafka kafka-rest confluent-platform

3
推荐指数
1
解决办法
2327
查看次数

Kafka REST 代理 API 有哪些好处?

我不知道Kafka REST Proxy API的优点。它是一个 REST API,所以我知道它对于管理来说很方便。人们为什么使用 Kafka REST 代理 API?添加对生产者或消费者的 Maven 依赖是否很麻烦?

另外,我知道kafka客户端有更好的性能。

rest apache-kafka kafka-rest confluent-platform

3
推荐指数
1
解决办法
2296
查看次数

无法反序列化主题数据

我正在使用这里的汇合 cp-all-in-one 项目配置:https://github.com/confluenceinc/cp-docker-images/blob/5.2.2-post/examples/cp-all-in-one /docker-compose.yml

http://localhost:8082/topics/zuum-positions 我正在使用以下 AVRO 正文发布一条消息:

{  
   "key_schema": "{\"type\":\"string\"}",
   "value_schema":"{  \"type\":\"record\",\"name\":\"Position\",\"fields\":[  {  \"name\":\"loadId\",\"type\":\"double\"},{\"name\":\"lat\",\"type\":\"double\"},{  \"name\":\"lon\",\"type\":\"double\"}]}",
   "records":[  
      {  
         "key":"22",
         "value":{  
            "lat":43.33,
            "lon":43.33,
            "loadId":22
         }
      }
   ]
}
Run Code Online (Sandbox Code Playgroud)

我已将以下标头正确添加到上述 POST 请求中: Content-Type: application/vnd.kafka.avro.v2+json Accept: application/vnd.kafka.v2+json

执行此请求时,我在 docker 日志中看到以下异常:

Error encountered in task zuum-sink-positions-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where consumed record is {topic='zuum-positions', partition=0, offset=25, timestamp=1563480487456, timestampType=CreateTime}. org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic zuum-positions to Avro: 
connect            |    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka docker confluent-schema-registry kafka-rest confluent-platform

2
推荐指数
1
解决办法
1万
查看次数

docker schemaregistry和kafkarest无法启动

Zookeeper 和 Kafka 沟通良好并且运行良好。

不知道为什么模式注册表和 Kafka 休息无法启动。

下面是 docker-compose 文件。

这是架构和其余 docker-compose 文件。这个泊坞窗上的错误

[main] ERROR io.confluent.admin.utils.cli.KafkaReadyCommand - Error while running kafka-ready.
java.lang.RuntimeException: No endpoints found for security protocol [PLAINTEXT]. Endpoints found in ZK [{EXTERNAL=localhost:9092, INTERNAL=kafka:29092}]
        at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:143)



[main] INFO org.apache.zookeeper.ZooKeeper - Session: 0x1003f4a8fa10006 closed
[main] ERROR io.confluent.admin.utils.cli.KafkaReadyCommand - Error while running kafka-ready.
java.lang.RuntimeException: No endpoints found for security protocol [PLAINTEXT]. Endpoints found in ZK [{EXTERNAL=localhost:9092, INTERNAL=kafka:29092}]
        at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:143)
Run Code Online (Sandbox Code Playgroud)

码头工人组成:

schema-registry:
    network_mode: pm
    image: confluentinc/cp-schema-registry:5.2.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - …
Run Code Online (Sandbox Code Playgroud)

apache-kafka docker apache-zookeeper confluent-schema-registry kafka-rest

2
推荐指数
1
解决办法
7128
查看次数