标签: confluent-schema-registry

Kafka Stream with Avro in JAVA , schema.registry.url" 没有默认值

我的 Kafka Stream 应用程序具有以下配置

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId);
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,svrConfig.getBootstrapServers());
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
    config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, svrConfig.getCacheMaxBytesBufferingConfig());

    // Exactly once processing!!
    config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
    config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

Exception in thread "main" io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107)
Run Code Online (Sandbox Code Playgroud)

我试图更换线路

config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
Run Code Online (Sandbox Code Playgroud)

config.put("schema.registry.url","http://localhost:8081");
Run Code Online (Sandbox Code Playgroud)

但有同样的错误

在准备我的 Stream 应用程序时,我已按照 …

java avro apache-kafka apache-kafka-streams confluent-schema-registry

5
推荐指数
1
解决办法
1万
查看次数

Spring Embedded Kafka + Mock Schema Registry:State Store ChangeLog Schema 未注册

我正在使用Spring Embedded Kafka Broker和 MockSchemaRegistryClient为我们的 kafka 系统构建集成测试。我正在为我们的 Stream 拓扑之一构建测试,使用 Streams API (KStreamBuilder) 构建。这个特殊的拓扑有一个 KStream (stream1) 馈入 KTable (table1)。

当我将输入输入到 stream1 时遇到错误,该错误源自 table1 的 KTableProcessor:

Exception in thread "mortgage-kafka-consumers-it-c1dd9185-ce16-415c-ad82-293c1281c897-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=streaming.mortgage.application_party, partition=0, offset=0
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:202)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 6
**Caused by: java.io.IOException: Cannot get schema from schema registry!**
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:106)
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getBySubjectAndID(MockSchemaRegistryClient.java:149)
    at …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams confluent-schema-registry embedded-kafka

5
推荐指数
1
解决办法
3009
查看次数

无法通过 Docker 连接到单节点 Kafka 服务器

我正在尝试通过 Docker 连接到单节点 Kafka 服务器,但出现以下错误:

%3|1529395526.480|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
%3|1529395526.480|ERROR|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
%3|1529395526.480|ERROR|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: 1/1 brokers are down
Run Code Online (Sandbox Code Playgroud)

docker-compose.yml 文件内容如下:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    network_mode: host
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    extra_hosts:
      - "moby:127.0.0.1"

  kafka:
    image: confluentinc/cp-kafka:latest
    network_mode: host
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: localhost:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_ADVERTISED_HOSTNAME: kafka
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    extra_hosts:
      - "moby:127.0.0.1"

  schema_registry:
    image: confluentinc/cp-schema-registry
    hostname: schema_registry
    depends_on:
      - zookeeper
      - kafka
    ports: …
Run Code Online (Sandbox Code Playgroud)

python apache-kafka docker apache-zookeeper confluent-schema-registry

5
推荐指数
1
解决办法
5019
查看次数

确保与 Kafka Schema 和 OpenAPI 规范的一致性

许多 API/微服务提供对关键资源(包括 Kafka 主题)的访问。API/微服务消息使用定义 API/微服务合约的 OpenAPI 规范进行验证。一旦微服务验证了消息,它就会发布到 Kafka 主题,此时该消息将根据 Kafka 的架构注册表(再次)进行验证。

问题在于,有两个消息定义可供验证消息(OpenAPI 规范和 Kafka 的架构注册表),并且确保两个消息定义同步是一个挑战。

考虑到这一点,我有几个问题:

  • 有没有办法将 OpenAPI 规范转换为 Kafka 架构注册表格式(反之亦然)?
  • 有没有办法让 Kafka 根据 OpenAPI 规范而不是注册表进行验证(可能不是一个很好的解决方案,因为应该使用本机 Kafka 功能)?
  • 有没有办法允许 API/微服务根据 Kafka 模式而不是 OpenAPI 规范验证其消息(同样,这可能不是一个好方法,因为 OpenAPI 规范是定义 API 消息的标准方法)?

最后,上面哪一个最有道理。还有其他更好的选择吗?

apache-kafka microservices openapi confluent-schema-registry

5
推荐指数
1
解决办法
1417
查看次数

Confluent Schema Registry 持久化

即使服务器重新启动,我也希望能够保留具有固定 ID 的架构。

是否可以将架构保留在架构注册表中,以便在服务器崩溃后使它们具有相同的 ID?

否则,是否可以在模式注册服务器启动时使用固定 ID 对模式进行硬编码?

schema apache-kafka confluent-schema-registry

4
推荐指数
1
解决办法
796
查看次数

模式注册表中的关键模式是什么?

我对键模式没有确切的概念,它是什么,以及为什么它必须用作键是自动生成的,我们只是传递一个值(消息)。

对于值,我们将架构传递给 AVRO 序列化器,序列化器从架构注册表中获取它的架构 ID,并将架构 ID 与我们传递的值(消息)一起嵌入(如果我错了,请纠正我)。钥匙怎么办?

我们还需要传递一个键模式吗?传递密钥模式的重要性是什么?而且,如何传递密钥模式?

java avro apache-kafka confluent-schema-registry

4
推荐指数
1
解决办法
5194
查看次数

Kafka 与 Confluent Kubernetes Helm Charts = Schema Registry WakeupException

我的主要问题:为什么模式注册表会崩溃?

外围问题:如果我为每个 zookeeper/kafka/schema-registry 配置了一个服务器,为什么每个 pod 都启动?其他一切看起来基本正确吗?

?  helm repo update
<snip>

?  helm install --values values.yaml --name my-confluent-oss confluentinc/cp-helm-charts
<snip>

?  helm list
NAME                REVISION    UPDATED                     STATUS      CHART                   APP VERSION NAMESPACE
my-confluent-oss    1           Sat Oct 20 19:09:08 2018    DEPLOYED    cp-helm-charts-0.1.0    1.0         default  

?  kubectl get pods
NAME                                                   READY     STATUS             RESTARTS   AGE
my-confluent-oss-cp-kafka-0                            2/2       Running            0          20m
my-confluent-oss-cp-schema-registry-59d8877584-c2jc7   1/2       CrashLoopBackOff   7          20m
my-confluent-oss-cp-zookeeper-0                        2/2       Running            0          20m
Run Code Online (Sandbox Code Playgroud)

values.yaml的如下。我已经用helm install --debug --dry-run. 我只是禁用持久性,设置单个服务器(这是在 VM 中运行的开发设置),并暂时禁用额外服务,直到我获得基础工作:

cp-kafka:
  brokers: 1 …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kubernetes kubernetes-helm confluent-schema-registry confluent-platform

4
推荐指数
1
解决办法
2421
查看次数

如何将基本身份验证传递给 Confluent Schema Registry?

我想从融合的云主题读取数据,然后写入另一个主题。

在本地主机上,我没有遇到任何重大问题。但是confluent cloud的schema registry需要传递一些我不知道如何输入的身份验证数据:

basic.auth.credentials.source=USER_INFO

schema.registry.basic.auth.user.info=:

schema.registry.url= https://xxxxxxxxxx.confluent.cloudBlockquote

以下是当前代码:

import com.databricks.spark.avro.SchemaConverters
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.SparkSession

object AvroConsumer {
  private val topic = "transactions"
  private val kafkaUrl = "http://localhost:9092"
  private val schemaRegistryUrl = "http://localhost:8081"

  private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
  private val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)

  private val avroSchema = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value").getSchema
  private var sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("ConfluentConsumer")
      .master("local[*]")
      .getOrCreate() …
Run Code Online (Sandbox Code Playgroud)

apache-spark databricks confluent-schema-registry spark-structured-streaming confluent-platform

4
推荐指数
1
解决办法
2827
查看次数

注册 Avro 架构时出错:“string” RestClientException:正在注册的架构与较早的架构不兼容;

我正在尝试使用 Avro 架构向我的经纪人发送消息,但“我总是收到错误消息:

2020-02-01 11:24:37.189 [nioEventLoopGroup-4-1] 错误应用程序 - 未处理:POST - /api/orchestration/org.apache.kafka.common.errors.SerializationException:注册 Avro 架构时出错:“字符串”导致作者:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:正在注册的架构与较早的架构不兼容;错误代码:409

这是我的码头集装箱:

 connect:
    image: confluentinc/cp-kafka-connect:5.4.0
    hostname: confluentinc-connect
    container_name: confluentinc-connect
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: confluentinc-connect
      CONNECT_CONFIG_STORAGE_TOPIC: confluentinc-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: confluentinc-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: confluentinc-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "true"
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" …
Run Code Online (Sandbox Code Playgroud)

avro kotlin apache-kafka confluent-schema-registry

4
推荐指数
1
解决办法
3728
查看次数

如何以完全兼容的方式将枚举值添加到 AVRO 模式?

我在 AVRO 模式中有一个枚举,如下所示:

{
    "type": "record",
    "name": "MySchema",
    "namespace": "com.company",
    "fields": [
        {
            "name": "color",
            "type": {
                "type": "enum",
                "name": "Color",
                "symbols": [
                    "UNKNOWN",
                    "GREEN",
                    "RED"
                ]
            },
            "default": "UNKNOWN"
        }
    ]
}
Run Code Online (Sandbox Code Playgroud)

当使用 FULL (这意味着 BACKWARD 和 FORWARD)兼容模式时,我应该如何向枚举添加新符号?这不可能吗?

我读到Avro schema :向现有架构添加枚举值是否向后兼容?但这没有帮助。

每当我尝试向符号添加新值时,即使我在枚举上有默认值,它也无法在架构注册表中进行兼容性检查。经过一番测试后,似乎添加新值是向后兼容的,但不是向前兼容的。但是,由于我设置的默认值,我希望它也兼容 FORWARD。事实上,旧的读取器模式应该能够读取新模式写入的值,并且当它不知道新符号时默认为“UNKNOWN”枚举值。

java avro confluent-schema-registry

4
推荐指数
1
解决办法
1万
查看次数