我是kafka的新手,我对Confluent平台感到好奇.
看来Confluent平台上的用户故事并不多.
汇合平台是否只为卡夫卡增加了更多的价值?
或者任何人都可以告诉我你最喜欢哪一个?
我必须选择其中一个.
我需要使用Confluent kafka-avro-serializerMaven工件.从官方指南,我应该将此存储库添加到我的Maven pom
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
Run Code Online (Sandbox Code Playgroud)
问题是,当我得到以下响应时,URL http://packages.confluent.io/maven/似乎不起作用
<Error>
<Code>NoSuchKey</Code>
<Message>The specified key does not exist.</Message>
<Key>maven/</Key>
<RequestId>15E287D11E5D4DFA</RequestId>
<HostId>
QVr9lCF0y3SrQoa1Z0jDWtmxD3eJz1gAEdivauojVJ+Bexb2gB6JsMpnXc+JjF95i082hgSLJSM=
</HostId>
</Error>
Run Code Online (Sandbox Code Playgroud)
实际上Maven没有找到神器
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.1.1</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
你知道问题是什么吗?谢谢
我期待产品电离并部署我的Kafka Connect应用程序.但是,我有两个关于tasks.max设置的问题,这是必需的并且具有很高的重要性,但细节对于实际设置此值的内容是模糊的.
我最简单的问题如下:如果我有一个带有n个分区的主题,我希望从中获取数据并写入某个接收器(在我的情况下,我写入S3),我应该将tasks.max设置为什么?我应该把它设置为n吗?我应该把它设置为2n吗?直观地说,似乎我想将值设置为n,这就是我一直在做的事情.
如果我更改我的Kafka主题并增加主题分区怎么办?如果我把它设置为n,我将不得不暂停我的Kafka连接器并增加tasks.max?如果我设置了2n的值,那么我的连接器应该自动增加它运行的并行度?
谢谢你的帮助!
嗨,我目前正在使用Docker设置Kafka.我已设法使用已发布的汇合图像设置Zookeeper和Kafka,请参阅以下docker-compose文件:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:3.2.0
container_name: zookeeper
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
restart: always
kafka:
image: confluentinc/cp-kafka:3.2.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- '9092:9092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.99.100:9092
LISTENERS: PLAINTEXT://0.0.0.0:9092
restart: always
kafka-rest:
image: confluentinc/cp-kafka-rest:3.2.0
container_name: kafka-rest
depends_on:
- kafka
ports:
- '8082:8082'
environment:
KAFKA_REST_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_REST_LISTENERS: http://kafka-rest:8082
KAFKA_REST_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KAFKA_REST_HOST_NAME: kafka-rest
restart: always
schema-registry:
image: confluentinc/cp-schema-registry:3.2.0
container_name: schema-registry
depends_on:
- kafka
ports:
- '8081'
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: …Run Code Online (Sandbox Code Playgroud) 我KafkaProducer在我的测试用例中使用,我的生产者使用schemaRegistryUrl指向我本地实例的Schema Registry.有没有办法模拟KafkaProducer与Schema Registry的连接方式?也就是说,让KafkaProducer/Consumer我的测试在没有运行Schema Registry的实例的情况下工作.
我正在尝试使用Confluent kafka-avro-console-consumer,但是如何将Schema Registry的参数传递给它?
我有windows环境和我自己的一套kafka和zookeeper运行.为了使用自定义对象,我开始使用Avro.但我需要启动注册表.下载了Confluent平台并运行了这个:
$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
/c/Confluent/confluent-3.0.0-2.11/confluent-3.0.0/bin/schema-registry-run-class: line 103: C:\Program: No such file or directory
Run Code Online (Sandbox Code Playgroud)
然后我在安装页面上看到了这个:
"Confluent目前不支持Windows.Windows用户可以下载并使用zip和tar档案,但必须直接运行jar文件,而不是使用bin /目录中的包装脚本."
我想知道如何在Windows环境中启动融合模式注册表?
查看脚本的内容,很难解读.
谢谢
avro apache-kafka confluence-rest-api kafka-producer-api confluent
我KafkaProducer可以使用KafkaAvroSerializer序列化对象到我的主题.但是,KafkaConsumer.poll()返回反GenericRecord序列化而不是我的序列化类.
MyKafkaProducer
KafkaProducer<CharSequence, MyBean> producer;
try (InputStream props = Resources.getResource("producer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
MyBean bean = new MyBean();
producer = new KafkaProducer<>(properties);
producer.send(new ProducerRecord<>(topic, bean.getId(), bean));
Run Code Online (Sandbox Code Playgroud)
我的KafkaConsumer
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
properties.load(props);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
consumer = new KafkaConsumer<>(properties);
}
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
if (records.isEmpty()) {
continue;
}
for …Run Code Online (Sandbox Code Playgroud) 我已经在node.js中实现了Avro模式,该模式随消息有效负载一起发送。而且工作正常。我在寻找是否可以通过Kafka-node模块使用架构注册表。我进行了探索,但未成功找到任何东西。
在每个消息中发送模式会增加消息大小吗?与使用架构注册表相比,它会影响性能吗?
任何帮助,将不胜感激。
node.js avro apache-kafka confluent confluent-schema-registry
我正在用Kafka Streams(v0.10.0.1)编写一个应用程序,并希望用查找数据丰富我正在处理的记录.此数据(带时间戳的文件)每天(或每天2-3次)写入HDFS目录.
如何在Kafka Streams应用程序中加载它并加入实际KStream?
当新文件到达时,从HDFS重新读取数据的最佳做法是什么?
或者更好地切换到Kafka Connect将KDBka表内容写入Kafka主题并将其写入所有Kafka Streams应用程序实例可以使用的主题?
更新:
正如Kafka Connect所建议的那样.因为查询数据每天在RDBMS中更新,所以我考虑将Kafka Connect作为预定的一次性工作运行,而不是始终保持连接打开.是的,因为语义和保持连接始终打开的开销,并确保它不会被中断..等等.对于我来说,在这种情况下进行预定的提取看起来更安全.
查找数据不大,可以删除/添加/修改记录.我不知道如何总是完全转储到Kafka主题并截断以前的记录.启用日志压缩并为已删除的密钥发送空值可能不起作用,因为我不知道源系统中已删除的内容.另外AFAIK我在压实发生时没有控制权.
hadoop apache-kafka apache-kafka-streams confluent apache-kafka-connect