我的 Kafka Stream 应用程序具有以下配置
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,svrConfig.getBootstrapServers());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, svrConfig.getCacheMaxBytesBufferingConfig());
// Exactly once processing!!
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
Exception in thread "main" io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107)
Run Code Online (Sandbox Code Playgroud)
我试图更换线路
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
Run Code Online (Sandbox Code Playgroud)
和
config.put("schema.registry.url","http://localhost:8081");
Run Code Online (Sandbox Code Playgroud)
但有同样的错误
在准备我的 Stream 应用程序时,我已按照 …
java avro apache-kafka apache-kafka-streams confluent-schema-registry
我正在使用Spring Embedded Kafka Broker和 MockSchemaRegistryClient为我们的 kafka 系统构建集成测试。我正在为我们的 Stream 拓扑之一构建测试,使用 Streams API (KStreamBuilder) 构建。这个特殊的拓扑有一个 KStream (stream1) 馈入 KTable (table1)。
当我将输入输入到 stream1 时遇到错误,该错误源自 table1 的 KTableProcessor:
Exception in thread "mortgage-kafka-consumers-it-c1dd9185-ce16-415c-ad82-293c1281c897-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=streaming.mortgage.application_party, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:202)
at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 6
**Caused by: java.io.IOException: Cannot get schema from schema registry!**
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:106)
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getBySubjectAndID(MockSchemaRegistryClient.java:149)
at …Run Code Online (Sandbox Code Playgroud) apache-kafka apache-kafka-streams confluent-schema-registry embedded-kafka
我正在尝试通过 Docker 连接到单节点 Kafka 服务器,但出现以下错误:
%3|1529395526.480|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
%3|1529395526.480|ERROR|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
%3|1529395526.480|ERROR|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: 1/1 brokers are down
Run Code Online (Sandbox Code Playgroud)
docker-compose.yml 文件内容如下:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
network_mode: host
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
extra_hosts:
- "moby:127.0.0.1"
kafka:
image: confluentinc/cp-kafka:latest
network_mode: host
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_ADVERTISED_HOSTNAME: kafka
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
extra_hosts:
- "moby:127.0.0.1"
schema_registry:
image: confluentinc/cp-schema-registry
hostname: schema_registry
depends_on:
- zookeeper
- kafka
ports: …Run Code Online (Sandbox Code Playgroud) python apache-kafka docker apache-zookeeper confluent-schema-registry
许多 API/微服务提供对关键资源(包括 Kafka 主题)的访问。API/微服务消息使用定义 API/微服务合约的 OpenAPI 规范进行验证。一旦微服务验证了消息,它就会发布到 Kafka 主题,此时该消息将根据 Kafka 的架构注册表(再次)进行验证。
问题在于,有两个消息定义可供验证消息(OpenAPI 规范和 Kafka 的架构注册表),并且确保两个消息定义同步是一个挑战。
考虑到这一点,我有几个问题:
最后,上面哪一个最有道理。还有其他更好的选择吗?
apache-kafka microservices openapi confluent-schema-registry
即使服务器重新启动,我也希望能够保留具有固定 ID 的架构。
是否可以将架构保留在架构注册表中,以便在服务器崩溃后使它们具有相同的 ID?
否则,是否可以在模式注册服务器启动时使用固定 ID 对模式进行硬编码?
我对键模式没有确切的概念,它是什么,以及为什么它必须用作键是自动生成的,我们只是传递一个值(消息)。
对于值,我们将架构传递给 AVRO 序列化器,序列化器从架构注册表中获取它的架构 ID,并将架构 ID 与我们传递的值(消息)一起嵌入(如果我错了,请纠正我)。钥匙怎么办?
我们还需要传递一个键模式吗?传递密钥模式的重要性是什么?而且,如何传递密钥模式?
我的主要问题:为什么模式注册表会崩溃?
外围问题:如果我为每个 zookeeper/kafka/schema-registry 配置了一个服务器,为什么每个 pod 都启动?其他一切看起来基本正确吗?
? helm repo update
<snip>
? helm install --values values.yaml --name my-confluent-oss confluentinc/cp-helm-charts
<snip>
? helm list
NAME REVISION UPDATED STATUS CHART APP VERSION NAMESPACE
my-confluent-oss 1 Sat Oct 20 19:09:08 2018 DEPLOYED cp-helm-charts-0.1.0 1.0 default
? kubectl get pods
NAME READY STATUS RESTARTS AGE
my-confluent-oss-cp-kafka-0 2/2 Running 0 20m
my-confluent-oss-cp-schema-registry-59d8877584-c2jc7 1/2 CrashLoopBackOff 7 20m
my-confluent-oss-cp-zookeeper-0 2/2 Running 0 20m
Run Code Online (Sandbox Code Playgroud)
我values.yaml的如下。我已经用helm install --debug --dry-run. 我只是禁用持久性,设置单个服务器(这是在 VM 中运行的开发设置),并暂时禁用额外服务,直到我获得基础工作:
cp-kafka:
brokers: 1 …Run Code Online (Sandbox Code Playgroud) apache-kafka kubernetes kubernetes-helm confluent-schema-registry confluent-platform
我想从融合的云主题读取数据,然后写入另一个主题。
在本地主机上,我没有遇到任何重大问题。但是confluent cloud的schema registry需要传递一些我不知道如何输入的身份验证数据:
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=:
schema.registry.url= https://xxxxxxxxxx.confluent.cloudBlockquote
以下是当前代码:
import com.databricks.spark.avro.SchemaConverters
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.SparkSession
object AvroConsumer {
private val topic = "transactions"
private val kafkaUrl = "http://localhost:9092"
private val schemaRegistryUrl = "http://localhost:8081"
private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
private val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
private val avroSchema = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value").getSchema
private var sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("ConfluentConsumer")
.master("local[*]")
.getOrCreate() …Run Code Online (Sandbox Code Playgroud) apache-spark databricks confluent-schema-registry spark-structured-streaming confluent-platform
我正在尝试使用 Avro 架构向我的经纪人发送消息,但“我总是收到错误消息:
2020-02-01 11:24:37.189 [nioEventLoopGroup-4-1] 错误应用程序 - 未处理:POST - /api/orchestration/org.apache.kafka.common.errors.SerializationException:注册 Avro 架构时出错:“字符串”导致作者:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:正在注册的架构与较早的架构不兼容;错误代码:409
这是我的码头集装箱:
connect:
image: confluentinc/cp-kafka-connect:5.4.0
hostname: confluentinc-connect
container_name: confluentinc-connect
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: confluentinc-connect
CONNECT_CONFIG_STORAGE_TOPIC: confluentinc-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: confluentinc-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: confluentinc-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" …Run Code Online (Sandbox Code Playgroud) 我在 AVRO 模式中有一个枚举,如下所示:
{
"type": "record",
"name": "MySchema",
"namespace": "com.company",
"fields": [
{
"name": "color",
"type": {
"type": "enum",
"name": "Color",
"symbols": [
"UNKNOWN",
"GREEN",
"RED"
]
},
"default": "UNKNOWN"
}
]
}
Run Code Online (Sandbox Code Playgroud)
当使用 FULL (这意味着 BACKWARD 和 FORWARD)兼容模式时,我应该如何向枚举添加新符号?这不可能吗?
我读到Avro schema :向现有架构添加枚举值是否向后兼容?但这没有帮助。
每当我尝试向符号添加新值时,即使我在枚举上有默认值,它也无法在架构注册表中进行兼容性检查。经过一番测试后,似乎添加新值是向后兼容的,但不是向前兼容的。但是,由于我设置的默认值,我希望它也兼容 FORWARD。事实上,旧的读取器模式应该能够读取新模式写入的值,并且当它不知道新符号时默认为“UNKNOWN”枚举值。
apache-kafka ×8
avro ×4
java ×3
apache-spark ×1
databricks ×1
docker ×1
kotlin ×1
kubernetes ×1
openapi ×1
python ×1
schema ×1