我正在玩Kafka-Connect.我已经HDFS connector在独立模式和分布式模式下工作了.
他们宣传工作人员(负责运行连接器)可以通过管理YARN 但是,我还没有看到任何描述如何实现这一目标的文档.
我如何YARN开始执行工作人员?如果没有具体的方法,是否有关于如何让应用程序在其中运行的通用方法YARN?
我已经使用YARN过SPARK,spark-submit但我无法弄清楚如何让连接器运行YARN.
JDBC 连接器和Debezium SQL Server CDC 连接器有什么区别(或任何其他关系数据库连接器)我应该什么时候选择一个而不是另一个,寻找在两个关系数据库之间同步的解决方案?
不确定这个讨论是否应该是关于 CDC 与 JDBC 连接器,而不是 Debezium SQL Server CDC 连接器,甚至只是 Debezium,期待以后的编辑,取决于给定的答案(虽然我的情况是关于 SQL Server 接收器)。
与您分享我对这个主题的研究,这让我想到了这个问题(作为答案)
我们的集群运行 Kafka 0.11,并且对使用消费者组有严格的限制。我们不能使用任意的消费者组,因此管理员必须创建所需的消费者组。
我们运行 Kafka Connect HDFS Sinks 从主题读取数据并写入 HDFS。所有主题只有一个分区。
在 Kafka HDFS Sink 中使用消费者组时,我可以考虑以下两种模式。
如图所示:
案例一:每个topic都有自己的Consumer Group

我知道当一个主题有多个分区时,如果一个消费者失败,同一消费者组中的另一个消费者将接管该分区。
我的问题 :
当多个主题共享同一个消费群体时,是否会发生同样的事情?即:如果消费者失败(HDFS 接收器),另一个消费者(HDFS 接收器连接器)是否会接管工作并读取该主题?
更新:每个 Kafka HDFS Sink Connector 只订阅一个主题。
在启动 Kafka-Connect 时,我看到了很多警告
10:33:56.706 [DistributedHerder] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'config.storage.topic' was supplied but isn't a known config.
10:33:56.707 [DistributedHerder] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'group.id' was supplied but isn't a known config.
10:33:56.708 [DistributedHerder] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'status.storage.topic' was supplied but isn't a known config.
10:33:56.709 [DistributedHerder] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'internal.key.converter.schemas.enable' was supplied but isn't a known config.
10:33:56.710 [DistributedHerder] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'config.storage.replication.factor' was supplied but isn't a known config.
10:33:56.710 [DistributedHerder] …Run Code Online (Sandbox Code Playgroud) 我期待产品电离并部署我的Kafka Connect应用程序.但是,我有两个关于tasks.max设置的问题,这是必需的并且具有很高的重要性,但细节对于实际设置此值的内容是模糊的.
我最简单的问题如下:如果我有一个带有n个分区的主题,我希望从中获取数据并写入某个接收器(在我的情况下,我写入S3),我应该将tasks.max设置为什么?我应该把它设置为n吗?我应该把它设置为2n吗?直观地说,似乎我想将值设置为n,这就是我一直在做的事情.
如果我更改我的Kafka主题并增加主题分区怎么办?如果我把它设置为n,我将不得不暂停我的Kafka连接器并增加tasks.max?如果我设置了2n的值,那么我的连接器应该自动增加它运行的并行度?
谢谢你的帮助!
我目前正在SQL Server中运行文本搜索,这正成为瓶颈,出于显而易见的原因,我想将其移至Elasticsearch,但是我知道我必须对数据进行非规范化才能获得最佳性能和可伸缩性。
目前,我的文本搜索包括一些聚合和联接多个表以获得最终输出。联接的表不是很大(每个表最多20GB),但是会不定期地更改(插入,更新,删除)(其中两个每周一次,另一个x每天一次)。
我的计划是将Apache Kafka与Kafka Connect一起使用,以便从我的SQL Server中读取CDC,在Kafka中加入此数据并将其保留在Elasticsearch中,但是我找不到任何资料可以告诉我在处理数据时如何处理删除操作坚持使用Elasticsearch。
默认驱动程序甚至支持吗?如果没有,有什么可能?Apache Spark,Logstash?
我正在尝试在 BULK 模式下使用具有以下属性的 Kafka Connect JDBC Source Connector。
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
timestamp.column.name=timestamp
connection.password=XXXXX
validate.non.null=false
tasks.max=1
producer.buffer.memory=2097152
batch.size=1000
producer.enable.idempotence=true
offset.flush.timeout.ms=300000
table.types=TABLE,VIEW
table.whitelist=materials
offset.flush.interval.ms=5000
mode=bulk
topic.prefix=mysql-
connection.user=kafka_connect_user
poll.interval.ms=200000
connection.url=jdbc:mysql://<DBNAME>
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Run Code Online (Sandbox Code Playgroud)
我收到有关提交偏移量的以下错误,更改各种参数似乎影响不大。
[2019-04-04 12:42:14,886] INFO WorkerSourceTask{id=SapMaterialsConnector-0} flushing 4064 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-04-04 12:42:19,886] ERROR WorkerSourceTask{id=SapMaterialsConnector-0} Failed to flush, timed out while waiting for producer to flush outstanding 712 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
Run Code Online (Sandbox Code Playgroud) 我正在将日志消息写入Kafka主题,我希望保留此主题是永久性的.我在Kafka和Kafka Connect(_schemas,connect-configs,connect-status,connect-offsets等)中看到,有些特殊主题未被日志保留时间删除.如何强制主题与其他特殊主题一样?它是命名约定还是其他一些属性?
谢谢
使用Confluent JDBC连接器时,我遇到了很奇怪的行为.我很确定它与Confluent堆栈无关,而是与Kafka-connect框架本身无关.
因此,我将offset.storage.file.filename属性定义为默认值/tmp/connect.offsets并运行我的接收器连接器.显然,我希望连接器在给定文件中保持偏移量(它在文件系统中不存在,但它应该自动创建,对吧?).文件说:
offset.storage.file.filename要存储连接器偏移量的文件.通过在磁盘上存储偏移量,可以在单个节点上停止并启动独立进程,并从之前停止的位置继续.
但卡夫卡表现得完全不同.
这是一个错误,或者更可能的是,我不明白如何使用这种配置?我理解两种保持偏移和文件存储的方法之间的区别对我的需求更方便.
我们构建了一个自定义Kafka Connect接收器,后者又调用远程REST API.如何将背压传播到Kafka Connect基础架构,因此在远程系统比内部消费者向put()传递消息的速度慢的情况下,调用put()的频率较低?Kafka连接文档说我们不应该在put()中阻塞,而是在flush()中阻塞.但是put()中没有阻塞意味着我们必须缓冲数据,如果put()被调用的频率高于flush(),那么在某些时候肯定会导致OOM异常.我已经看到允许kafka使用者在循环()中调用pause()或阻塞.是否有可能在卡夫卡连接接收器中利用它?
apache-kafka ×9
amazon-s3 ×1
cdc ×1
confluent ×1
debezium ×1
hadoop-yarn ×1
java ×1
jdbc ×1
sql ×1
sql-server ×1