我KafkaProducer可以使用KafkaAvroSerializer序列化对象到我的主题.但是,KafkaConsumer.poll()返回反GenericRecord序列化而不是我的序列化类.
MyKafkaProducer
 KafkaProducer<CharSequence, MyBean> producer;
    try (InputStream props = Resources.getResource("producer.props").openStream()) {
      Properties properties = new Properties();
      properties.load(props);
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");
      MyBean bean = new MyBean();
      producer = new KafkaProducer<>(properties);
      producer.send(new ProducerRecord<>(topic, bean.getId(), bean));
我的KafkaConsumer
 try (InputStream props = Resources.getResource("consumer.props").openStream()) {
      properties.load(props);
      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");
      consumer = new KafkaConsumer<>(properties);
    }
    consumer.subscribe(Arrays.asList(topic));
    try {
      while (true) {
        ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
        if (records.isEmpty()) {
          continue;
        }
        for …我有一个具有以下配置的接收器连接器
{
    "name": "sink-test-mariadb-MY_TOPIC",
    "config": { 
                "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
                "tasks.max":"10",
                "topics":"MY_TOPIC",
                "connection.url":"jdbc:mariadb://localhost:3306/myschema?user=myuser&password=mypass",
                "auto.create":"false",
                "auto.evolve":"true",
                "table.name.format":"MY_TABLE",
                "pk.mode":"record_value",
                "pk.fields":"ID",
                "insert.mode":"upsert",
                "transforms":"ExtractField",
                "transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
                "transforms.ExtractField.field":"data"
        }
}
一段时间后,连接器的所有任务都会失败,并出现以下错误:
{
    "state": "FAILED",
    "trace": "org.apache.kafka.connect.errors.DataException: MY_TOPIC
                at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:95)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
                at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
                at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:748)
            Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 802
            Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
                at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:202)
                at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
                at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:409)
                at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:402)
                at …我想将kafka-avro-console-producer与架构注册表一起使用。我有大模式(超过 10k 个字符),我不能真正将它们作为命令行参数传递。除此之外,我想直接使用架构注册表,以便我可以使用特定的架构 ID。
我正在考虑这样的事情,但它不起作用:
kafka-avro-console-producer \
 --broker-list <broker-list> \
 --topic <topic>  \
 --property schema.registry.url=http://localhost:8081 \
 --property value.schema=`curl http://localhost:8081/schemas/ids/419`
尝试使用架构注册表发布有关主题的 json 消息,但出现以下错误。以下Spring Boot方法  
..
已提供配置“schema.registry.url”,但不是已知配置
应用程序 yml 文件
server:
  port: 9080
spring:
  kafka:
    properties:
      bootstrap.servers: server1:8080 
      schema.registry.url: https://bctdsdg:8081/
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer 
      value-serializer: JsonSerializer.class        
    ssl:
      keystore-location: classpath:cert.jks
      keystore-password: pwd
      key-password: pwd
      truststore-location: classpath:dev_cacerts.jks
      truststore-password: pwd
卡夫卡配置
@Configuration
@EnableKafka
public class KafkaConfig {
    
    @Autowired
    private KafkaProperties kafkaProperties;
    
    @Bean
      public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, com.schemaregjson.serdes.JsonSerializer.class);
        props.put("schema.registry.url", "https://bctdsdg:8081/");
        props.putAll(kafkaProperties.getSsl().buildProperties());
        props.putAll(kafkaProperties.getProperties());
        return props;
      }
    
     @Bean
        public ProducerFactory<String, User> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs()); …我是Kafka和Avro的菜鸟.所以我一直试图让Producer/Consumer运行起来.到目前为止,我已经能够使用以下内容生成和使用简单的字节和字符串:生产者的配置:
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(USER_SCHEMA);
    Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 1000; i++) {
        GenericData.Record avroRecord = new GenericData.Record(schema);
        avroRecord.put("str1", "Str 1-" + i);
        avroRecord.put("str2", "Str 2-" + i);
        avroRecord.put("int1", i);
        byte[] bytes = recordInjection.apply(avroRecord);
        ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
        producer.send(record);
        Thread.sleep(250);
    }
    producer.close();
}
现在这一切都很好,当我尝试序列化POJO时问题就出现了.因此,我能够使用Avro提供的实用程序从POJO获取AvroSchema.对模式进行硬编码,然后尝试创建通用记录以通过KafkaProducer发送,生成器现在设置为:
    Properties props = new …我已经在node.js中实现了Avro模式,该模式随消息有效负载一起发送。而且工作正常。我在寻找是否可以通过Kafka-node模块使用架构注册表。我进行了探索,但未成功找到任何东西。
在每个消息中发送模式会增加消息大小吗?与使用架构注册表相比,它会影响性能吗?
任何帮助,将不胜感激。
node.js avro apache-kafka confluent confluent-schema-registry
如何使用 Spring-Kafka 通过 Confluent Schema 注册表读取 AVRO 消息?有样品吗?我在官方参考文件中找不到它。
avro spring-kafka confluent-schema-registry confluent-platform
我想自定义 Spring Cloud Stream Producers、Consumers 和 KStreams 中 Avro 模式主题的命名策略。
key.subject.name.strategy这将在 Kafka 中通过以下属性完成value.subject.name.strategy-> https://docs.confluence.io/current/schema-registry/serializer-formatter.html#subject-name-strategy
在原生 Kafka Producer 中,这是有效的:
private val producer: KafkaProducer<Int, Customer>
    init {
        val props = Properties()
        ...
        props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
        props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
        producer = KafkaProducer(props)
    }
    fun sendCustomerEvent(customer: Customer) {
        val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
        producer.send(record)
    }
但是我找不到如何在 Spring Cloud Stream 中执行此操作。到目前为止,我已经在制作人中尝试过了:
spring:
  application:
    name: spring-boot-customer-service
  cloud:
    stream:
      kafka:
        bindings:
          output:
            producer:
              configuration:
                key:
                  serializer: org.apache.kafka.common.serialization.IntegerSerializer
                value:
                  subject:
                    name:
                      strategy: …avro apache-kafka spring-cloud-stream apache-kafka-streams confluent-schema-registry
我们使用 schema 注册表来存储 schema,消息被序列化到 avro 并推送到 kafka 主题。
想知道,当从消费者读取数据时,如何找到 avro 记录被序列化的 schema id。我们需要此架构 ID 来跟踪是否将新列添加到表中的更改。如果添加或删除新列,架构注册表中将生成新的架构 id,以及如何在消费者中获取该 id。
consumer = KafkaConsumer(bootstrap_servers = conf['BOOTSTRAP_SERVERS'],
                        auto_offset_reset = conf['AUTO_OFFSET'],
                        enable_auto_commit = conf['AUTO_COMMIT'],
                        auto_commit_interval_ms = conf['AUTO_COMMIT_INTERVAL']
                        )
consumer.subscribe(conf['KAFKA_TOPICS'])
for message in consumer:
    print(message.key)
从上面的代码中,message.key 打印该特定记录的键,以及我们如何找到消费者用来反序列化记录的相应模式 ID。
curl -X GET http://localhost:8081/subjects/helpkit_internal.helpkit_support.agents-value/versions/2
{"subject":"helpkit_internal.helpkit_support.agents-value","version":2,"id":33,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"helpkit_internal.helpkit_support.agents\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"user_id\"
我们想要从消费者那里获取 id 值"id":33
请就此提出建议。
python apache-kafka kafka-consumer-api kafka-python confluent-schema-registry
首先,我必须说我对 confluence 并不熟悉。
我正在关注本教程:https://www.confluence.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/,但我陷入了困境。
我无法为 Kafka 创建使用者,因为我收到错误:io.confluence.common.config.ConfigException:缺少没有默认值的所需配置“schema.registry.url”。
我在 yml 配置中找不到此架构属性。
Confluence 在本地运行:
$: confluent local start
zookeeper is already running. Try restarting if needed
kafka is already running. Try restarting if needed
schema-registry is already running. Try restarting if needed
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
在 Spring 中设置用户主题后,从控制中心我看到了不同的模式:
{
  "connect.name": "ksql.users",
  "fields": [
    {
      "name": "registertime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": …apache-kafka ×9
avro ×7
java ×4
confluent ×2
spring-kafka ×2
kafka-python ×1
node.js ×1
python ×1