标签: apache-kafka-connect

Kafka Connect 5.5.0 - 无法重置 max.request.size

在 confluence-5.5.0 中 - 我无法更改 max.request.size ,它始终默认为 ProducerConfig 中的 max.request.size = 1048576 。

以下是我已经用 noluck 尝试过的参数:

confluence-5.5.0/etc/kafka/ Producer.properties

  max.request.size=15728640
  producer.max.request.size=15728640
Run Code Online (Sandbox Code Playgroud)

confluence-5.5.0/etc/kafka/server.properties

  message.max.bytes=15728640
  replica.fetch.max.bytes=15728640
  max.request.size=15728640
  fetch.message.max.bytes=15728640
Run Code Online (Sandbox Code Playgroud)

/data/confluence-5.5.0/etc/kafka/consumer.properties

  max.partition.fetch.bytes=15728640
Run Code Online (Sandbox Code Playgroud)

confluence-5.5.0/etc/kafka-rest/kafka-rest.properties

  max.request.size=15728640
Run Code Online (Sandbox Code Playgroud)

注意:这些值都没有在 connect.log 中更新

  1. 我已经停止/启动 confluence-5.5.0 ,甚至销毁了以前的图像并重新启动。

我错过了什么吗?

在评论信息后我也尝试了以下内容:

/data/confluence-5.5.0/etc/kafka/connect-standalone.properties

生产者.override.max.request.size=15728640 消费者.override.max.partition.fetch.bytes=15728640

/data/confluence-5.5.0/etc/kafka/connect-distributed.properties

生产者.override.max.request.size=15728640 消费者.override.max.partition.fetch.bytes=15728640

max.request.size 仍然没有改变。

(已解决)基于输入:

我在connect或configuration中添加了上述配置。并将政策从无更改为全部。哪个正确应用了配置更改。

apache-kafka kafka-producer-api apache-kafka-connect

2
推荐指数
1
解决办法
4225
查看次数

如何在Kafka连接器中正确连接Elastic Operator部署的Elasticsearch?

我在 Kafka 中有一些 CDC 数据。现在我正在尝试从Kafka下沉到Elasticsearch。这是我到目前为止所做的:

第 1 步 - 在 Kubernetes 中部署 Elasticsearch(成功)

我按照本教程使用 Elastic Operator 在 Kubernetes 中部署了 Elasticsearch:

  1. 在 Kubernetes 集群中部署 ECK:https://www.elastic.co/guide/en/cloud-on-k8s/current/k8s-deploy-eck.html
  2. 部署 Elasticsearch 集群:https://www.elastic.co/guide/en/cloud-on-k8s/current/k8s-deploy-elasticsearch.html
apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
  name: hm-elasticsearch
  namespace: elastic
spec:
  version: 7.14.0
  nodeSets:
    - name: default
      count: 1
      config:
        node.store.allow_mmap: false
Run Code Online (Sandbox Code Playgroud)

根据教程,我可以通过在标题中提供用户名elastic和密码来成功调用passw0rd

curl -u "elastic:passw0rd" -k "https://hm-elasticsearch-es-http.elastic:9200"
Run Code Online (Sandbox Code Playgroud)

返回

{
    "name": "hm-elasticsearch-es-default-0",
    "cluster_name": "hm-elasticsearch",
    "cluster_uuid": "TWgIk0YGR_GVr7IJZcW62g",
    "version": {
        "number": "7.14.0",
        "build_flavor": "default",
        "build_type": "docker",
        "build_hash": "dd5a0a2acaa2045ff9624f3729fc8a6f40835aa1",
        "build_date": "2021-07-29T20:49:32.864135063Z",
        "build_snapshot": …
Run Code Online (Sandbox Code Playgroud)

elasticsearch apache-kafka kubernetes apache-kafka-connect

2
推荐指数
1
解决办法
3372
查看次数

Kafka Connect 错误:com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException:记录中缺少用户密钥

我正在尝试将卡夫卡中的数据摄取到 Aerospike 中。我在发送的卡夫卡消息中缺少什么?

我将以下数据发送到 kafka 以推送到 aerospike:

ubuntu@ubuntu-VirtualBox:/opt/kafka_2.13-2.8.1$ bin/kafka-console-producer.sh --topic phone --bootstrap-server localhost:9092
>{"schema":{"type":"struct","optional":false,"version":1,"fields":[{"field":"name","type":"string","optional":true}]},"payload":{"name":"Anuj"}}
Run Code Online (Sandbox Code Playgroud)

Kafka 连接出现以下错误:

com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException:记录中缺少用户密钥

[2021-12-13 21:33:34,747] ERROR failed to put record SinkRecord{kafkaOffset=13, timestampType=CreateTime} ConnectRecord{topic='phone', kafkaPartition=0, key=null, keySchema=null, value=Struct{name=Anuj}, valueSchema=Schema{STRUCT}, timestamp=1639411413702, headers=ConnectHeaders(headers=)} (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:288)
com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record
    at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractUserKey(AerospikeRecordConverter.kt:131)
    at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractKey(AerospikeRecordConverter.kt:68)
    at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractRecord(AerospikeRecordConverter.kt:41)
    at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:69)
    at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:25)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.applyTransform(AerospikeSinkTask.kt:341)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.toAerospikeOperation(AerospikeSinkTask.kt:315)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.putRecord(AerospikeSinkTask.kt:239)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.access$putRecord(AerospikeSinkTask.kt:47)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask$put$2$2.invokeSuspend(AerospikeSinkTask.kt:220)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2021-12-13 21:33:35,458] INFO 1 errors for topic phone (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:552) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka aerospike apache-kafka-connect

2
推荐指数
1
解决办法
368
查看次数

Kafka Mirror Maker:消费者与消费者的线程编号和生产者编号

我想清楚地指出一个Mirror Maker的Kafka Parallelism模型.

对于我在消费者方面的理解:

  • 消费者集团是一组消费者.该组的每个消费者都可以从一个或多个主题中读取.

  • 该组的消费者可以拥有多个流,即从主题中读取的线程数,最佳做法是将一个线程用于分区.

我的疑问是:我们是否将多个线程的消费者与单线程或一个消费者挂钩?消费者群体是指一组消费者还是一个多线程消费者?

我发现很难从文档中指出这些问题,我想知道我是不是错了.

即使在生产者方面,这些考虑因素也是双向的吗?

apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-connect

1
推荐指数
1
解决办法
827
查看次数

JDBC Kafka Connector可以从多个数据库中提取数据吗?

我很乐意设置一个JDBC Kafka连接器集群,并将它们配置为从同一主机上运行的多个数据库中提取.

我一直在查看Kafka Connect文档,看来在配置JDBC连接器后它只能从单个数据库中提取数据.

谁能证实这一点?

apache-kafka apache-kafka-connect

1
推荐指数
1
解决办法
722
查看次数

如何使用debezium更改数据捕获在mysql中捕获数据并使用kafka connect中的jdbc接收器消耗?

我有使用debezium更改数据捕获在mysql中捕获数据并使用kafka connect jdbc sink将其消耗给另一个mysql的问题.

因为debezium对kafka主题产生的模式和有效负载与kafka connect jdbc sink期望的模式不兼容.

当jdbc接收器想要使用数据并在另一个mysql中创建记录时,我得到异常.

我该如何解决这个问题?

change-data-capture apache-kafka apache-kafka-connect debezium

1
推荐指数
1
解决办法
1030
查看次数

重新启动群集时,连接使用者作业将被删除

关于更改与kafka相关的一些属性并重新启动集群,我面临以下问题.

In kafka Consumer, there were 5 consumer jobs are running . 
Run Code Online (Sandbox Code Playgroud)

如果我们进行了一些重要的属性更改,并且在重新启动集群时,部分/全部现有的使用者作业无法启动.

Ideally all the consumer jobs should start , 
Run Code Online (Sandbox Code Playgroud)

因为它将从以下系统主题获取元数据信息.

config.storage.topic
offset.storage.topic
status.storage.topic
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

1
推荐指数
1
解决办法
587
查看次数

从kafka转换数据的最简单方法

我正在开发一个项目,使用kafka connect从多个数据库源中提取数据.我希望能够将数据转换为指定的json格式,然后最终将最终的json推送到S3存储桶,最好使用kafka connect来保持我的开销.

以下是数据目前进入kafka(以avro格式)的示例:

{"tableName":"TABLE1","SchemaName{"string":"dbo"},"tableID":1639117030,"columnName":{"string":"DATASET"},"ordinalPosition":{"int":1},"isNullable":{"int":1},"dataType":{"string":"varchar"},"maxLength":{"int":510},"precision":{"int":0},"scale":{"int":0},"isPrimaryKey":{"int":0},"tableSizeKB":{"long":72}}
{"tableName":"dtproperties","SchemaName":{"string":"dbo"},"tableID":1745441292,"columnName":{"string":"id"},"ordinalPosition":{"int":1},"isNullable":{"int":0},"dataType":{"string":"int"},"maxLength":{"int":4},"precision":{"int":10},"scale":{"int":0},"isPrimaryKey":{"int":1},"tableSizeKB":{"long":24}}
Run Code Online (Sandbox Code Playgroud)

转换为JSON时看起来如此:

{
      "tablename" : "AS_LOOKUPS",
      "tableID": 5835333,
      "columnName": "SVALUE",
      "ordinalPosition": 6,
      "isNullable": 1,
      "dataType": "varchar",
      "maxLength": 4000,
      "precision": 0,
      "scale": 0,
      "isPrimaryKey": 0,
      "tableSize": 0,
      "sizeUnit": "GB"
},
{
      "tablename" : "AS_LOOKUPS",
      "tableID": 5835333,
      "columnName": "SORT_ORDER",
      "ordinalPosition": 7,
      "isNullable": 1,
      "dataType": "int",
      "maxLength": 4,
      "precision": 10,
      "scale": 0,
      "isPrimaryKey": 0,
      "tableSize": 0,
      "sizeUnit": "GB"
}
Run Code Online (Sandbox Code Playgroud)

我的目标是让数据看起来像这样:

{
  "header": "Database Inventory",
  "DBName": "DB",
  "ServerName": "server@server.com",
  "SchemaName": "DBE",
  "DB Owner": "Name",
  "DB Guardian" : "Name/Group",
  "ASV" …
Run Code Online (Sandbox Code Playgroud)

bigdata apache-kafka apache-kafka-streams apache-kafka-connect

1
推荐指数
1
解决办法
3814
查看次数

MS SQL CDC与Kafka Connect和Apache Kafka

在我目前的用例中,我使用Spark核心从MS SQL Server读取数据并对数据进行一些处理并每隔1分钟将其发送到Kafka,我使用Spark和Phoenix来维护HBase表中的CDC信息.

但是这种设计存在一些问题,例如,如果MS SQL记录激增,Spark处理比批处理间隔花费更多时间,并且spark最终会向Kafka发送重复记录.

作为替代方案,我正在考虑使用Kafka Connect从MS SQL读取消息并将记录发送到Kafka主题并在Kafka中维护MS SQL CDC.Spark Streaming将从Kafka主题中读取记录,并将记录和存储处理到HBase并发送到其他Kafka主题.

我有几个问题要实现这个架构:

  1. 我可以使用开源Kafka连接器和Apache Kafka 0.9版本来实现这种架构.

  2. 如果是的话,请你推荐一个GitHub项目,它可以为我提供这样的连接器,我可以使用SQL查询CDC MS SQL表SELECT * FROM SOMETHING WHERE COLUMN > ${lastExtractUnixTime}),并将记录存储到Kafka主题中.

  3. Kafka connect是否支持Kerberos Kafka设置.

sql-server apache-kafka apache-spark apache-kafka-connect

1
推荐指数
1
解决办法
7427
查看次数

Kafka Connect和Streams

所以我最近开始阅读Kafka,我对Kafka Connect和Kafka Streams之间的区别感到有些困惑.根据定义,Kafka Streams可以从Kafka主题收集数据,处理它并将输出推送到另一个Kafka主题.而Kafka Connect将大型数据集移入和移出Kafka.

我的问题是为什么我们需要Kafka Connect几乎可以读取数据,处理数据并将其推送到主题?为什么要增加一个组件 如果有人可以解释差异,那将是很棒的,在此先感谢:)

apache-kafka apache-kafka-streams apache-kafka-connect

1
推荐指数
1
解决办法
403
查看次数