在 confluence-5.5.0 中 - 我无法更改 max.request.size ,它始终默认为 ProducerConfig 中的 max.request.size = 1048576 。
以下是我已经用 noluck 尝试过的参数:
confluence-5.5.0/etc/kafka/ Producer.properties
max.request.size=15728640
producer.max.request.size=15728640
Run Code Online (Sandbox Code Playgroud)
confluence-5.5.0/etc/kafka/server.properties
message.max.bytes=15728640
replica.fetch.max.bytes=15728640
max.request.size=15728640
fetch.message.max.bytes=15728640
Run Code Online (Sandbox Code Playgroud)
/data/confluence-5.5.0/etc/kafka/consumer.properties
max.partition.fetch.bytes=15728640
Run Code Online (Sandbox Code Playgroud)
confluence-5.5.0/etc/kafka-rest/kafka-rest.properties
max.request.size=15728640
Run Code Online (Sandbox Code Playgroud)
注意:这些值都没有在 connect.log 中更新
我错过了什么吗?
在评论信息后我也尝试了以下内容:
/data/confluence-5.5.0/etc/kafka/connect-standalone.properties
生产者.override.max.request.size=15728640 消费者.override.max.partition.fetch.bytes=15728640
/data/confluence-5.5.0/etc/kafka/connect-distributed.properties
生产者.override.max.request.size=15728640 消费者.override.max.partition.fetch.bytes=15728640
max.request.size 仍然没有改变。
(已解决)基于输入:
我在connect或configuration中添加了上述配置。并将政策从无更改为全部。哪个正确应用了配置更改。
我在 Kafka 中有一些 CDC 数据。现在我正在尝试从Kafka下沉到Elasticsearch。这是我到目前为止所做的:
我按照本教程使用 Elastic Operator 在 Kubernetes 中部署了 Elasticsearch:
apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
name: hm-elasticsearch
namespace: elastic
spec:
version: 7.14.0
nodeSets:
- name: default
count: 1
config:
node.store.allow_mmap: false
Run Code Online (Sandbox Code Playgroud)
根据教程,我可以通过在标题中提供用户名elastic和密码来成功调用passw0rd
curl -u "elastic:passw0rd" -k "https://hm-elasticsearch-es-http.elastic:9200"
Run Code Online (Sandbox Code Playgroud)
返回
{
"name": "hm-elasticsearch-es-default-0",
"cluster_name": "hm-elasticsearch",
"cluster_uuid": "TWgIk0YGR_GVr7IJZcW62g",
"version": {
"number": "7.14.0",
"build_flavor": "default",
"build_type": "docker",
"build_hash": "dd5a0a2acaa2045ff9624f3729fc8a6f40835aa1",
"build_date": "2021-07-29T20:49:32.864135063Z",
"build_snapshot": …Run Code Online (Sandbox Code Playgroud) 我正在尝试将卡夫卡中的数据摄取到 Aerospike 中。我在发送的卡夫卡消息中缺少什么?
我将以下数据发送到 kafka 以推送到 aerospike:
ubuntu@ubuntu-VirtualBox:/opt/kafka_2.13-2.8.1$ bin/kafka-console-producer.sh --topic phone --bootstrap-server localhost:9092
>{"schema":{"type":"struct","optional":false,"version":1,"fields":[{"field":"name","type":"string","optional":true}]},"payload":{"name":"Anuj"}}
Run Code Online (Sandbox Code Playgroud)
Kafka 连接出现以下错误:
com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException:记录中缺少用户密钥
[2021-12-13 21:33:34,747] ERROR failed to put record SinkRecord{kafkaOffset=13, timestampType=CreateTime} ConnectRecord{topic='phone', kafkaPartition=0, key=null, keySchema=null, value=Struct{name=Anuj}, valueSchema=Schema{STRUCT}, timestamp=1639411413702, headers=ConnectHeaders(headers=)} (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:288)
com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record
at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractUserKey(AerospikeRecordConverter.kt:131)
at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractKey(AerospikeRecordConverter.kt:68)
at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractRecord(AerospikeRecordConverter.kt:41)
at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:69)
at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:25)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.applyTransform(AerospikeSinkTask.kt:341)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.toAerospikeOperation(AerospikeSinkTask.kt:315)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.putRecord(AerospikeSinkTask.kt:239)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.access$putRecord(AerospikeSinkTask.kt:47)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask$put$2$2.invokeSuspend(AerospikeSinkTask.kt:220)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2021-12-13 21:33:35,458] INFO 1 errors for topic phone (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:552) …Run Code Online (Sandbox Code Playgroud) 我想清楚地指出一个Mirror Maker的Kafka Parallelism模型.
对于我在消费者方面的理解:
消费者集团是一组消费者.该组的每个消费者都可以从一个或多个主题中读取.
该组的消费者可以拥有多个流,即从主题中读取的线程数,最佳做法是将一个线程用于分区.
我的疑问是:我们是否将多个线程的消费者与单线程或一个消费者挂钩?消费者群体是指一组消费者还是一个多线程消费者?
我发现很难从文档中指出这些问题,我想知道我是不是错了.
即使在生产者方面,这些考虑因素也是双向的吗?
apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-connect
我很乐意设置一个JDBC Kafka连接器集群,并将它们配置为从同一主机上运行的多个数据库中提取.
我一直在查看Kafka Connect文档,看来在配置JDBC连接器后它只能从单个数据库中提取数据.
谁能证实这一点?
我有使用debezium更改数据捕获在mysql中捕获数据并使用kafka connect jdbc sink将其消耗给另一个mysql的问题.
因为debezium对kafka主题产生的模式和有效负载与kafka connect jdbc sink期望的模式不兼容.
当jdbc接收器想要使用数据并在另一个mysql中创建记录时,我得到异常.
我该如何解决这个问题?
change-data-capture apache-kafka apache-kafka-connect debezium
关于更改与kafka相关的一些属性并重新启动集群,我面临以下问题.
In kafka Consumer, there were 5 consumer jobs are running .
Run Code Online (Sandbox Code Playgroud)
如果我们进行了一些重要的属性更改,并且在重新启动集群时,部分/全部现有的使用者作业无法启动.
Ideally all the consumer jobs should start ,
Run Code Online (Sandbox Code Playgroud)
因为它将从以下系统主题获取元数据信息.
config.storage.topic
offset.storage.topic
status.storage.topic
Run Code Online (Sandbox Code Playgroud) 我正在开发一个项目,使用kafka connect从多个数据库源中提取数据.我希望能够将数据转换为指定的json格式,然后最终将最终的json推送到S3存储桶,最好使用kafka connect来保持我的开销.
以下是数据目前进入kafka(以avro格式)的示例:
{"tableName":"TABLE1","SchemaName{"string":"dbo"},"tableID":1639117030,"columnName":{"string":"DATASET"},"ordinalPosition":{"int":1},"isNullable":{"int":1},"dataType":{"string":"varchar"},"maxLength":{"int":510},"precision":{"int":0},"scale":{"int":0},"isPrimaryKey":{"int":0},"tableSizeKB":{"long":72}}
{"tableName":"dtproperties","SchemaName":{"string":"dbo"},"tableID":1745441292,"columnName":{"string":"id"},"ordinalPosition":{"int":1},"isNullable":{"int":0},"dataType":{"string":"int"},"maxLength":{"int":4},"precision":{"int":10},"scale":{"int":0},"isPrimaryKey":{"int":1},"tableSizeKB":{"long":24}}
Run Code Online (Sandbox Code Playgroud)
转换为JSON时看起来如此:
{
"tablename" : "AS_LOOKUPS",
"tableID": 5835333,
"columnName": "SVALUE",
"ordinalPosition": 6,
"isNullable": 1,
"dataType": "varchar",
"maxLength": 4000,
"precision": 0,
"scale": 0,
"isPrimaryKey": 0,
"tableSize": 0,
"sizeUnit": "GB"
},
{
"tablename" : "AS_LOOKUPS",
"tableID": 5835333,
"columnName": "SORT_ORDER",
"ordinalPosition": 7,
"isNullable": 1,
"dataType": "int",
"maxLength": 4,
"precision": 10,
"scale": 0,
"isPrimaryKey": 0,
"tableSize": 0,
"sizeUnit": "GB"
}
Run Code Online (Sandbox Code Playgroud)
我的目标是让数据看起来像这样:
{
"header": "Database Inventory",
"DBName": "DB",
"ServerName": "server@server.com",
"SchemaName": "DBE",
"DB Owner": "Name",
"DB Guardian" : "Name/Group",
"ASV" …Run Code Online (Sandbox Code Playgroud) bigdata apache-kafka apache-kafka-streams apache-kafka-connect
在我目前的用例中,我使用Spark核心从MS SQL Server读取数据并对数据进行一些处理并每隔1分钟将其发送到Kafka,我使用Spark和Phoenix来维护HBase表中的CDC信息.
但是这种设计存在一些问题,例如,如果MS SQL记录激增,Spark处理比批处理间隔花费更多时间,并且spark最终会向Kafka发送重复记录.
作为替代方案,我正在考虑使用Kafka Connect从MS SQL读取消息并将记录发送到Kafka主题并在Kafka中维护MS SQL CDC.Spark Streaming将从Kafka主题中读取记录,并将记录和存储处理到HBase并发送到其他Kafka主题.
我有几个问题要实现这个架构:
我可以使用开源Kafka连接器和Apache Kafka 0.9版本来实现这种架构.
如果是的话,请你推荐一个GitHub项目,它可以为我提供这样的连接器,我可以使用SQL查询CDC MS SQL表SELECT * FROM SOMETHING WHERE COLUMN > ${lastExtractUnixTime}),并将记录存储到Kafka主题中.
Kafka connect是否支持Kerberos Kafka设置.
所以我最近开始阅读Kafka,我对Kafka Connect和Kafka Streams之间的区别感到有些困惑.根据定义,Kafka Streams可以从Kafka主题收集数据,处理它并将输出推送到另一个Kafka主题.而Kafka Connect将大型数据集移入和移出Kafka.
我的问题是为什么我们需要Kafka Connect几乎可以读取数据,处理数据并将其推送到主题?为什么要增加一个组件 如果有人可以解释差异,那将是很棒的,在此先感谢:)
apache-kafka ×10
aerospike ×1
apache-spark ×1
bigdata ×1
debezium ×1
kubernetes ×1
sql-server ×1