我KafkaProducer可以使用KafkaAvroSerializer序列化对象到我的主题.但是,KafkaConsumer.poll()返回反GenericRecord序列化而不是我的序列化类.
MyKafkaProducer
KafkaProducer<CharSequence, MyBean> producer;
try (InputStream props = Resources.getResource("producer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
MyBean bean = new MyBean();
producer = new KafkaProducer<>(properties);
producer.send(new ProducerRecord<>(topic, bean.getId(), bean));
Run Code Online (Sandbox Code Playgroud)
我的KafkaConsumer
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
properties.load(props);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
consumer = new KafkaConsumer<>(properties);
}
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
if (records.isEmpty()) {
continue;
}
for …Run Code Online (Sandbox Code Playgroud) 我有一个具有以下配置的接收器连接器
{
"name": "sink-test-mariadb-MY_TOPIC",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"10",
"topics":"MY_TOPIC",
"connection.url":"jdbc:mariadb://localhost:3306/myschema?user=myuser&password=mypass",
"auto.create":"false",
"auto.evolve":"true",
"table.name.format":"MY_TABLE",
"pk.mode":"record_value",
"pk.fields":"ID",
"insert.mode":"upsert",
"transforms":"ExtractField",
"transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractField.field":"data"
}
}
Run Code Online (Sandbox Code Playgroud)
一段时间后,连接器的所有任务都会失败,并出现以下错误:
{
"state": "FAILED",
"trace": "org.apache.kafka.connect.errors.DataException: MY_TOPIC
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:95)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 802
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:202)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:409)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:402)
at …Run Code Online (Sandbox Code Playgroud) 我想将kafka-avro-console-producer与架构注册表一起使用。我有大模式(超过 10k 个字符),我不能真正将它们作为命令行参数传递。除此之外,我想直接使用架构注册表,以便我可以使用特定的架构 ID。
我正在考虑这样的事情,但它不起作用:
kafka-avro-console-producer \
--broker-list <broker-list> \
--topic <topic> \
--property schema.registry.url=http://localhost:8081 \
--property value.schema=`curl http://localhost:8081/schemas/ids/419`
Run Code Online (Sandbox Code Playgroud) 尝试使用架构注册表发布有关主题的 json 消息,但出现以下错误。以下Spring Boot方法
..
已提供配置“schema.registry.url”,但不是已知配置
应用程序 yml 文件
server:
port: 9080
spring:
kafka:
properties:
bootstrap.servers: server1:8080
schema.registry.url: https://bctdsdg:8081/
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: JsonSerializer.class
ssl:
keystore-location: classpath:cert.jks
keystore-password: pwd
key-password: pwd
truststore-location: classpath:dev_cacerts.jks
truststore-password: pwd
Run Code Online (Sandbox Code Playgroud)
卡夫卡配置
@Configuration
@EnableKafka
public class KafkaConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, com.schemaregjson.serdes.JsonSerializer.class);
props.put("schema.registry.url", "https://bctdsdg:8081/");
props.putAll(kafkaProperties.getSsl().buildProperties());
props.putAll(kafkaProperties.getProperties());
return props;
}
@Bean
public ProducerFactory<String, User> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs()); …Run Code Online (Sandbox Code Playgroud) 我是Kafka和Avro的菜鸟.所以我一直试图让Producer/Consumer运行起来.到目前为止,我已经能够使用以下内容生成和使用简单的字节和字符串:生产者的配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);
Thread.sleep(250);
}
producer.close();
}
Run Code Online (Sandbox Code Playgroud)
现在这一切都很好,当我尝试序列化POJO时问题就出现了.因此,我能够使用Avro提供的实用程序从POJO获取AvroSchema.对模式进行硬编码,然后尝试创建通用记录以通过KafkaProducer发送,生成器现在设置为:
Properties props = new …Run Code Online (Sandbox Code Playgroud) 我已经在node.js中实现了Avro模式,该模式随消息有效负载一起发送。而且工作正常。我在寻找是否可以通过Kafka-node模块使用架构注册表。我进行了探索,但未成功找到任何东西。
在每个消息中发送模式会增加消息大小吗?与使用架构注册表相比,它会影响性能吗?
任何帮助,将不胜感激。
node.js avro apache-kafka confluent confluent-schema-registry
如何使用 Spring-Kafka 通过 Confluent Schema 注册表读取 AVRO 消息?有样品吗?我在官方参考文件中找不到它。
avro spring-kafka confluent-schema-registry confluent-platform
我想自定义 Spring Cloud Stream Producers、Consumers 和 KStreams 中 Avro 模式主题的命名策略。
key.subject.name.strategy这将在 Kafka 中通过以下属性完成value.subject.name.strategy-> https://docs.confluence.io/current/schema-registry/serializer-formatter.html#subject-name-strategy
在原生 Kafka Producer 中,这是有效的:
private val producer: KafkaProducer<Int, Customer>
init {
val props = Properties()
...
props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
producer = KafkaProducer(props)
}
fun sendCustomerEvent(customer: Customer) {
val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
producer.send(record)
}
Run Code Online (Sandbox Code Playgroud)
但是我找不到如何在 Spring Cloud Stream 中执行此操作。到目前为止,我已经在制作人中尝试过了:
spring:
application:
name: spring-boot-customer-service
cloud:
stream:
kafka:
bindings:
output:
producer:
configuration:
key:
serializer: org.apache.kafka.common.serialization.IntegerSerializer
value:
subject:
name:
strategy: …Run Code Online (Sandbox Code Playgroud) avro apache-kafka spring-cloud-stream apache-kafka-streams confluent-schema-registry
我们使用 schema 注册表来存储 schema,消息被序列化到 avro 并推送到 kafka 主题。
想知道,当从消费者读取数据时,如何找到 avro 记录被序列化的 schema id。我们需要此架构 ID 来跟踪是否将新列添加到表中的更改。如果添加或删除新列,架构注册表中将生成新的架构 id,以及如何在消费者中获取该 id。
consumer = KafkaConsumer(bootstrap_servers = conf['BOOTSTRAP_SERVERS'],
auto_offset_reset = conf['AUTO_OFFSET'],
enable_auto_commit = conf['AUTO_COMMIT'],
auto_commit_interval_ms = conf['AUTO_COMMIT_INTERVAL']
)
consumer.subscribe(conf['KAFKA_TOPICS'])
for message in consumer:
print(message.key)
Run Code Online (Sandbox Code Playgroud)
从上面的代码中,message.key 打印该特定记录的键,以及我们如何找到消费者用来反序列化记录的相应模式 ID。
curl -X GET http://localhost:8081/subjects/helpkit_internal.helpkit_support.agents-value/versions/2
{"subject":"helpkit_internal.helpkit_support.agents-value","version":2,"id":33,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"helpkit_internal.helpkit_support.agents\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"user_id\"
Run Code Online (Sandbox Code Playgroud)
我们想要从消费者那里获取 id 值"id":33
请就此提出建议。
python apache-kafka kafka-consumer-api kafka-python confluent-schema-registry
首先,我必须说我对 confluence 并不熟悉。
我正在关注本教程:https://www.confluence.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/,但我陷入了困境。
我无法为 Kafka 创建使用者,因为我收到错误:io.confluence.common.config.ConfigException:缺少没有默认值的所需配置“schema.registry.url”。
我在 yml 配置中找不到此架构属性。
Confluence 在本地运行:
$: confluent local start
zookeeper is already running. Try restarting if needed
kafka is already running. Try restarting if needed
schema-registry is already running. Try restarting if needed
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
Run Code Online (Sandbox Code Playgroud)
在 Spring 中设置用户主题后,从控制中心我看到了不同的模式:
{
"connect.name": "ksql.users",
"fields": [
{
"name": "registertime",
"type": "long"
},
{
"name": "userid",
"type": …Run Code Online (Sandbox Code Playgroud) apache-kafka ×9
avro ×7
java ×4
confluent ×2
spring-kafka ×2
kafka-python ×1
node.js ×1
python ×1