我正在使用来自Confluent的Kafka Connect来使用Kafka流并以镶木地板格式写入HDFS.我在1节点中使用Schema Registry服务,它运行正常.现在我想将Schema Registry分发到集群模式以处理故障转移.任何关于如何实现这一点的链接或片段都非常有用.
我目前正在使用汇合3.0.1平台.我试图在两个不同的工作者上创建2个连接器,但尝试创建一个新连接器正在为它创建一个新组.
Two connectors were created using below details:
1) POST http://devmetric.com:8083/connectors
{
"name": "connector1",
"config": {
"connector.class": "com.xxx.kafka.connect.sink.DeliverySinkConnector",
"tasks.max": "1",
"topics": "dev.ps_primary_delivery",
"elasticsearch.cluster.name": "ad_metrics_store",
"elasticsearch.hosts": "devkafka1.com:9300",
"elasticsearch.bulk.size": "100",
"tenants": "tenant1"
}
}
2) POST http://devkafka01.com:8083/connectors
{
"name": "connector2",
"config": {
"connector.class": "com.xxx.kafka.connect.sink.DeliverySinkConnector",
"tasks.max": "1",
"topics": "dev.ps_primary_delivery",
"elasticsearch.cluster.name": "ad_metrics_store",
"elasticsearch.hosts": "devkafka.com:9300",
"elasticsearch.bulk.size": "100",
"tenants": "tenant1"
}
}
Run Code Online (Sandbox Code Playgroud)
但是他们都是在不同的群组ID下创建的.在此之后,我询问现有的团体.
$ sh ./bin/kafka-consumer-groups --bootstrap-server devmetric.com:9091 --new-consumer --list
Result was:
connect-connector2
connect-connector1
Run Code Online (Sandbox Code Playgroud)
这些组是由Kafka自动创建的,而不是由我提供的.我在worker.properties中给了不同的group.id. 但是我希望两个连接器都在同一个组下,以便它们并行工作以共享消息.截至目前,我有一个关于主题"dev.ps_primary_delivery"的100万个数据,我希望两个连接器各自获得50万个.
请让我知道如何做到这一点.
我正在使用Kafka模式注册表来生成/使用Kafka消息,例如,我有两个字段均为字符串类型,伪模式如下所示?
{"name": "test1", "type": "string"}
{"name": "test2", "type": "string"}
Run Code Online (Sandbox Code Playgroud)
但是在发送和使用了一段时间之后,我需要修改架构以将第二个字段更改为long类型,然后引发以下异常:
Schema being registered is incompatible with an earlier schema; error code: 409
Run Code Online (Sandbox Code Playgroud)
我很困惑,如果架构注册表无法发展架构升级/更改,那为什么我应该使用架构注册表,或者为什么要使用Avro?
我在本地计算机上使用docker设置单节点基本Kafka部署,如Confluent Kafka文档中所述(步骤2-3).
另外,我还暴露了zookeeper的端口2181和kafka的端口9092,这样我就能从本地机器上运行的java客户端连接到它们:
$ docker run -d \
-p 2181:2181 \
--net=confluent \
--name=zookeeper \
-e ZOOKEEPER_CLIENT_PORT=2181 \
confluentinc/cp-zookeeper:4.1.0
$ docker run -d \
--net=confluent \
--name=kafka \
-p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:4.1.0
Run Code Online (Sandbox Code Playgroud)
问题:当我尝试从主机连接到kafka时,连接失败,因为它无法解析地址:kafka:9092.
这是我的Java代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "KafkaExampleProducer");
props.put("key.serializer", LongSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<Long, String> producer = new KafkaProducer<>(props);
ProducerRecord<Long, String> record = new ProducerRecord<>("foo", 1L, "Test 1");
producer.send(record).get();
producer.flush();
Run Code Online (Sandbox Code Playgroud)
例外:
java.io.IOException: Can't resolve …Run Code Online (Sandbox Code Playgroud) 我之前使用过融合的Kafka,通常我会更改log.dirs=my-NEW-Location位于的server.properties文件/etc/kafka/.
我刚刚在我的Ubuntu 16.04机器上安装了Confluent 3.3.0 ..使用命令启动没问题confluent start kafka.我试图将log.dirsin 更改server.properties为我的新位置但是汇合并没有因为某些原因而改变它.检查server.log文件后,Confluent会创建日志/tmp/confluent.SOME_RAMDOM_STRING/,有没有办法更改?
G
我一直在尝试使用kafka-connect api将kafka连接到elasticsearch.Kafka版本是0.11.0.0.这是我遵循的步骤:
1.Buiding Elasticsearch Connector:
https://github.com/confluentinc/kafka-connect-elasticsearch.git
2.建立连接器
$ cd kafka-connect-elasticsearch
$ mvn clean package
Run Code Online (Sandbox Code Playgroud)
3.最后运行脚本:
$ bin/connect-standalone.sh config/connect-standalone.properties config/elasticsearch-connect.properties
Run Code Online (Sandbox Code Playgroud)
它抛出以下异常:
Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.collect.Sets$SetView.iterator()Lcom/google/common/collect/UnmodifiableIterator;
at org.reflections.Reflections.expandSuperTypes(Reflections.java:380)
at org.reflections.Reflections.<init>(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:221)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:159)
at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:47)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:68)
Run Code Online (Sandbox Code Playgroud)
无法理解出了什么问题.
启动融合的dotnet使用者时,在调用订阅和随后的轮询之后,似乎需要很长时间才能从服务器接收到“分配的分区”事件,并因此收到消息(大约10-15秒)。
起初我以为会有自动创建主题的开销,但是无论消费者的主题/消费者组是否已经存在,时间都是相同的。
我从此配置开始使用我的使用者,其余代码与合并的高级使用者示例相同:
var kafkaConfig = new Dictionary<string, object>
{
{"group.id", config.ConsumerGroup},
{"statistics.interval.ms", 60000},
{"fetch.wait.max.ms", 10},
{"bootstrap.servers", config.BrokerList},
{"enable.auto.commit", config.AutoCommit},
{"socket.blocking.max.ms",1},
{"fetch.error.backoff.ms",1 },
{"socket.nagle.disable",true },
{"auto.commit.interval.ms", 5000},
{
"default.topic.config", new Dictionary<string, object>()
{
{"auto.offset.reset", "smallest"}
}
}
};
Run Code Online (Sandbox Code Playgroud)
kafka集群由具有默认设置的远程数据中心中的3台中低端规格机器组成。是否可以调整代理或客户端设置以减少启动时间?
编辑:使用“分配”而不是“订阅”自己分配分区,导致启动时间约为2秒
我正在尝试使用Kafka Connect Elasticsearch连接器,但未成功。它崩溃并显示以下错误:
[2018-11-21 14:48:29,096] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:108)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.confluent.connect.elasticsearch.ElasticsearchSinkConnector , available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='1.0.1', encodedVersion=1.0.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='1.0.1', encodedVersion=1.0.1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='1.0.1', encodedVersion=1.0.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, …Run Code Online (Sandbox Code Playgroud) 我已按照以下说明https://docs.confluent.io/current/installation/installing_cp/cp-helm使用Helm图表https://github.com/confluentinc/cp-helm-charts在本地Minikube上安装了Kafka。-charts / docs / index.html像这样:
helm install -f kafka_config.yaml confluentinc/cp-helm-charts --name kafka-home-delivery --namespace cust360
Run Code Online (Sandbox Code Playgroud)
kafka_config.yaml与默认yaml几乎相同,唯一的例外是我将其缩减为1个服务器/代理,而不是3个(只是因为我试图节省本地minikube上的资源;希望这与我的问题)。
MySQL实例也在Minikube上运行。这是的输出kubectl get pods --namespace myNamespace:
我想使用其中一种连接器(例如Debezium MySQL CDC)连接MySQL和Kafka 。在说明中说:
安装连接器
使用Confluent Hub客户端通过以下方式安装此连接器:
confluent-hub install debezium/debezium-connector-mysql:0.9.2
听起来不错,除了1)我不知道要在哪个Pod上运行此命令,2)所有Pod似乎都没有可用的confluent-hub命令。
问题:
mysql apache-kafka kubernetes confluent apache-kafka-connect
Kafka本身是完全免费和开源的.
Confluent是Kafka创作者的盈利公司.Confluent Platform是Kafka以及各种附加功能,例如架构注册表和数据库连接器.我认为Confluent通过出售支持合同和服务来赚钱.
Confluent Platform是免费和/或开源的吗?我有义务购买许可证或付费支持吗?