标签: confluent

汇聚架构注册表群集模式

我正在使用来自Confluent的Kafka Connect来使用Kafka流并以镶木地板格式写入HDFS.我在1节点中使用Schema Registry服务,它运行正常.现在我想将Schema Registry分发到集群模式以处理故障转移.任何关于如何实现这一点的链接或片段都非常有用.

apache-kafka confluent

5
推荐指数
1
解决办法
3333
查看次数

Kafka-Connect:在分布式模式下创建新连接器正在创建新组

我目前正在使用汇合3.0.1平台.我试图在两个不同的工作者上创建2个连接器,但尝试创建一个新连接器正在为它创建一个新组.

Two connectors were created using below details:

1) POST http://devmetric.com:8083/connectors

{
    "name": "connector1",
    "config": {
        "connector.class": "com.xxx.kafka.connect.sink.DeliverySinkConnector",
        "tasks.max": "1",
        "topics": "dev.ps_primary_delivery",
        "elasticsearch.cluster.name": "ad_metrics_store",
        "elasticsearch.hosts": "devkafka1.com:9300",
        "elasticsearch.bulk.size": "100",
        "tenants": "tenant1"
    }
}

2) POST http://devkafka01.com:8083/connectors

{
    "name": "connector2",
    "config": {
        "connector.class": "com.xxx.kafka.connect.sink.DeliverySinkConnector",
        "tasks.max": "1",
        "topics": "dev.ps_primary_delivery",
        "elasticsearch.cluster.name": "ad_metrics_store",
        "elasticsearch.hosts": "devkafka.com:9300",
        "elasticsearch.bulk.size": "100",
        "tenants": "tenant1"
    }
}
Run Code Online (Sandbox Code Playgroud)

但是他们都是在不同的群组ID下创建的.在此之后,我询问现有的团体.

$ sh ./bin/kafka-consumer-groups --bootstrap-server devmetric.com:9091  --new-consumer  --list

Result was:
connect-connector2
connect-connector1
Run Code Online (Sandbox Code Playgroud)

这些组是由Kafka自动创建的,而不是由我提供的.我在worker.properties中给了不同的group.id. 但是我希望两个连接器都在同一个组下,以便它们并行工作以共享消息.截至目前,我有一个关于主题"dev.ps_primary_delivery"的100万个数据,我希望两个连接器各自获得50万个.

请让我知道如何做到这一点.

elasticsearch apache-kafka confluent apache-kafka-connect

5
推荐指数
1
解决办法
3830
查看次数

Kafka模式注册表在同一主题中不兼容

我正在使用Kafka模式注册表来生成/使用Kafka消息,例如,我有两个字段均为字符串类型,伪模式如下所示?

{"name": "test1", "type": "string"}
{"name": "test2", "type": "string"}
Run Code Online (Sandbox Code Playgroud)

但是在发送和使用了一段时间之后,我需要修改架构以将第二个字段更改为long类型,然后引发以下异常:

Schema being registered is incompatible with an earlier schema; error code: 409
Run Code Online (Sandbox Code Playgroud)

我很困惑,如果架构注册表无法发展架构升级/更改,那为什么我应该使用架构注册表,或者为什么要使用Avro?

avro apache-kafka confluent confluent-schema-registry

5
推荐指数
4
解决办法
7434
查看次数

从本地计算机连接到Docker中运行的Kafka

我在本地计算机上使用docker设置单节点基本Kafka部署,如Confluent Kafka文档中所述(步骤2-3).

另外,我还暴露了zookeeper的端口2181和kafka的端口9092,这样我就能从本地机器上运行的java客户端连接到它们:

$ docker run -d \
    -p 2181:2181 \
    --net=confluent \
    --name=zookeeper \
    -e ZOOKEEPER_CLIENT_PORT=2181 \
    confluentinc/cp-zookeeper:4.1.0

$ docker run -d \
    --net=confluent \
    --name=kafka \
    -p 9092:9092 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
    -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
    confluentinc/cp-kafka:4.1.0
Run Code Online (Sandbox Code Playgroud)

问题:当我尝试从主机连接到kafka时,连接失败,因为它无法解析地址:kafka:9092.

这是我的Java代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "KafkaExampleProducer");
props.put("key.serializer", LongSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<Long, String> producer = new KafkaProducer<>(props);
ProducerRecord<Long, String> record = new ProducerRecord<>("foo", 1L, "Test 1");
producer.send(record).get();
producer.flush();
Run Code Online (Sandbox Code Playgroud)

例外:

java.io.IOException: Can't resolve …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka docker confluent

5
推荐指数
2
解决办法
7698
查看次数

Confluent 3.3.0无法更改默认日志目录位置

我之前使用过融合的Kafka,通常我会更改log.dirs=my-NEW-Location位于的server.properties文件/etc/kafka/.

我刚刚在我的Ubuntu 16.04机器上安装了Confluent 3.3.0 ..使用命令启动没问题confluent start kafka.我试图将log.dirsin 更改server.properties为我的新位置但是汇合并没有因为某些原因而改变它.检查server.log文件后,Confluent会创建日志/tmp/confluent.SOME_RAMDOM_STRING/,有没有办法更改?

G

apache-kafka confluent

4
推荐指数
1
解决办法
2204
查看次数

java.lang.NoSuchMethodError:com.google.common.collect.Sets $ SetView.iterator()Lcom/google/common/collect/UnmodifiableIterator;

我一直在尝试使用kafka-connect api将kafka连接到elasticsearch.Kafka版本是0.11.0.0.这是我遵循的步骤:

1.Buiding Elasticsearch Connector:

https://github.com/confluentinc/kafka-connect-elasticsearch.git

2.建立连接器

$ cd kafka-connect-elasticsearch
$ mvn clean package
Run Code Online (Sandbox Code Playgroud)

3.最后运行脚本:

$ bin/connect-standalone.sh config/connect-standalone.properties config/elasticsearch-connect.properties
Run Code Online (Sandbox Code Playgroud)

它抛出以下异常:

Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.collect.Sets$SetView.iterator()Lcom/google/common/collect/UnmodifiableIterator;
    at org.reflections.Reflections.expandSuperTypes(Reflections.java:380)
    at org.reflections.Reflections.<init>(Reflections.java:126)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:221)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:159)
    at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:47)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:68)
Run Code Online (Sandbox Code Playgroud)

无法理解出了什么问题.

elasticsearch apache-kafka confluent apache-kafka-connect

4
推荐指数
1
解决办法
6092
查看次数

卡夫卡消费者启动延迟汇合点网

启动融合的dotnet使用者时,在调用订阅和随后的轮询之后,似乎需要很长时间才能从服务器接收到“分配的分区”事件,并因此收到消息(大约10-15秒)。

起初我以为会有自动创建主题的开销,但是无论消费者的主题/消费者组是否已经存在,时间都是相同的。

我从此配置开始使用我的使用者,其余代码与合并的高级使用者示例相同:

            var kafkaConfig = new Dictionary<string, object>
        {
            {"group.id", config.ConsumerGroup},
            {"statistics.interval.ms", 60000},
            {"fetch.wait.max.ms", 10},
            {"bootstrap.servers", config.BrokerList},
            {"enable.auto.commit", config.AutoCommit},
            {"socket.blocking.max.ms",1},
            {"fetch.error.backoff.ms",1 },
            {"socket.nagle.disable",true },
            {"auto.commit.interval.ms", 5000},

            {
                "default.topic.config", new Dictionary<string, object>()
                {
                    {"auto.offset.reset", "smallest"}
                }
            }
        };
Run Code Online (Sandbox Code Playgroud)

kafka集群由具有默认设置的远程数据中心中的3台中低端规格机器组成。是否可以调整代理或客户端设置以减少启动时间?

编辑:使用“分配”而不是“订阅”自己分配分区,导致启动时间约为2秒

c# apache-kafka kafka-consumer-api confluent

4
推荐指数
1
解决办法
1020
查看次数

Kafka Connect找不到连接器

我正在尝试使用Kafka Connect Elasticsearch连接器,但未成功。它崩溃并显示以下错误:

[2018-11-21 14:48:29,096] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:108)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.confluent.connect.elasticsearch.ElasticsearchSinkConnector , available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='1.0.1', encodedVersion=1.0.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='1.0.1', encodedVersion=1.0.1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='1.0.1', encodedVersion=1.0.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, …
Run Code Online (Sandbox Code Playgroud)

apache-kafka confluent apache-kafka-connect

4
推荐指数
2
解决办法
2922
查看次数

在Helm安装的Kafka / Confluent上使用连接器

我已按照以下说明https://docs.confluent.io/current/installation/installing_cp/cp-helm使用Helm图表https://github.com/confluentinc/cp-helm-charts在本地Minikube上安装了Kafka。-charts / docs / index.html像这样:

helm install -f kafka_config.yaml confluentinc/cp-helm-charts --name kafka-home-delivery --namespace cust360
Run Code Online (Sandbox Code Playgroud)

kafka_config.yaml与默认yaml几乎相同,唯一的例外是我将其缩减为1个服务器/代理,而不是3个(只是因为我试图节省本地minikube上的资源;希望这与我的问题)。

MySQL实例也在Minikube上运行。这是的输出kubectl get pods --namespace myNamespace

在此处输入图片说明

我想使用其中一种连接器(例如Debezium MySQL CDC)连接MySQL和Kafka 。在说明中说:

安装连接器

使用Confluent Hub客户端通过以下方式安装此连接器:

confluent-hub install debezium/debezium-connector-mysql:0.9.2

听起来不错,除了1)我不知道要在哪个Pod上运行此命令,2)所有Pod似乎都没有可用的confluent-hub命令。

问题:

  1. 是否通过这些Helm图表未安装融合枢纽?
  2. 我必须自己安装融合式集线器吗?
  3. 如果是这样,我必须在哪个吊舱上安装它?

mysql apache-kafka kubernetes confluent apache-kafka-connect

4
推荐指数
1
解决办法
1049
查看次数

基于Kafka的Confluent平台是免费的吗?开源?

Kafka本身是完全免费和开源的.

Confluent是Kafka创作者的盈利公司.Confluent Platform是Kafka以及各种附加功能,例如架构注册表和数据库连接器.我认为Confluent通过出售支持合同和服务来赚钱.

Confluent Platform是免费和/或开源的吗?我有义务购买许可证或付费支持吗?

apache-kafka confluent

3
推荐指数
1
解决办法
1万
查看次数