卡夫卡高级制作人选出领导者的时间和频率是多少?是在发送每条消息之前还是在创建连接时只执行一次?
考虑卡夫卡生产者:
const { Kafka, logLevel } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
logLevel: logLevel.ERROR,
});
const run = async () => {
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: 'test-topic',
messages: [{ value: 'Hello KafkaJS user!' }],
});
await producer.disconnect();
};
run();
Run Code Online (Sandbox Code Playgroud)
每当我点击:node Producer.js时,响应是:
C:\Development-T410\Kafka>node producer
{"level":"ERROR","timestamp":"2020-10-09T08:24:07.646Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka1:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2020-10-09T08:24:07.674Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":325}
{"level":"ERROR","timestamp":"2020-10-09T08:24:09.004Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka2:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2020-10-09T08:24:09.006Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to …Run Code Online (Sandbox Code Playgroud) javascript apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-connect
我启动了一个 kafka-connect 分布式工作集群,它使用该主题connect-offset进行偏移存储:
offset.storage.topic=connect-offset
Run Code Online (Sandbox Code Playgroud)
由于代理提供了默认策略“cleanup.policy=delete”,因此当使用“cleanup.policy=compact”创建主题时,我最终将获得该主题的“cleanup.policy=compact,delete”。这会导致 kafka-connect 工作进程抛出异常:
org.apache.kafka.common.config.ConfigException:通过“offset.storage.topic”属性提供的主题“slpe-connect-offset”需要具有“cleanup.policy=compact”以保证源连接器的一致性和持久性偏移量,但发现该主题当前有“cleanup.policy=compact,delete”。继续可能会导致最终丢失源连接器偏移,并在将来重新启动此 Connect 集群时出现问题。更改 Connect Worker 配置中的“offset.storage.topic”属性,以使用带有“cleanup.policy=compact”的主题。
问题:是否有任何 kafka-connect 工作配置允许吃此异常以保留工作进程?虽然这是一个风险,但delete只有达到保留或大小限制才会发生。
我需要从 Kafka 主题中获取消息并通过基于 HTTP 的 API 通知其他系统。也就是说,从主题获取消息,映射到第 3 方 API 并调用它们。我打算为此编写一个 Kafka Sink 连接器。
对于这个用例,Kafka Connect 是正确的选择还是我应该使用 Kafka Client。
我试图弄清楚如何最初从查询中获取所有数据,然后仅使用 kafka 连接器增量更改。这样做的原因是我想将所有数据加载到弹性搜索中,然后使 es 与我的 kafka 流同步。目前,我首先使用带有模式 = 批量的连接器来执行此操作,然后将其更改为时间戳。这工作正常。
但是,如果我们想将所有数据重新加载到 Streams 和 ES,这意味着我们必须编写一些脚本来以某种方式清理或删除 kafka 流和 es 索引数据,修改连接 ini 以将模式设置为批量,重新启动所有内容,给出是时候加载所有数据,然后再次将脚本修改为时间戳模式,然后再次重新启动所有内容(需要这样一个脚本的原因是偶尔,批量更新会通过我们尚无法控制的 etl 过程来纠正历史数据,并且此过程不会更新时间戳)
有没有人做类似的事情并找到了更优雅的解决方案?
elasticsearch apache-kafka apache-kafka-connect confluent-platform
我正在使用 Confluent JDBCSourceConnector 从 Oracle 表中读取数据。我正在尝试使用 SMT 生成由 3 个连接字段组成的密钥。
transforms=createKey
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=BUS_OFC_ID_CD,SO_TYPE,SO_NO
Run Code Online (Sandbox Code Playgroud)
使用上面的转换,我得到了这样的东西:
{"BUS_OFC_ID_CD":"111","SO_TYPE":"I","SO_NO":"55555"}
Run Code Online (Sandbox Code Playgroud)
我想要类似的东西:
111I55555
Run Code Online (Sandbox Code Playgroud)
关于如何仅连接值的任何想法?
oracle jdbc apache-kafka apache-kafka-connect confluent-platform
我想知道 kafka 中消息的压缩大小。
我使用 kafka 1.1.0 和 java kafka-connect 1.1.0 将消息从我的生产者发送到主题。
如果消息对我的制作人来说太大,我会收到
该消息在序列化时为 xxx 字节,大于您使用 max.request.size 配置配置的最大请求大小。
将 max.request.size 设置为合适的值会导致来自代理的错误消息,因为 message.max.bytes 也必须在代理配置中进行相应调整。不幸的是,错误消息不包括代理收到的消息的大小。我调整了 message.max.bytes。到现在为止还挺好。
如果我在生产者端激活压缩,max.request.size 仍然必须与没有压缩的大小相同,因为不幸的是,代码在压缩之前比较了未压缩消息的大小(请参阅https://issues.apache .org/jira/browse/KAFKA-4169 )
但是通过压缩,我将能够减少代理中的 message.max.bytes。问题是我在任何时候都无法确定此压缩消息的大小。有没有办法在发送消息之前或稍后在日志文件中在生产者代码中弄清楚这一点?
在我使用压缩的情况下,message.max.bytes 的默认值 1MB 就足够了,所以我不必更改默认配置。但我想知道我的压缩消息是远低于 1MB 还是只有 0.99MB。在这种情况下,我可能会在生产中增加 message.max.bytes 以避免出现问题。
提前感谢您的支持。
我需要从具有约 2000 个模式的 PostgreSQL 数据库中获取数据。所有模式都包含相同的表(它是一个多租户应用程序)。
连接器配置如下:
{
"name": "postgres-source",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "updated",
"incrementing.column.name": "id",
"connection.password": "********",
"tasks.max": "1",
"mode": "timestamp+incrementing",
"topic.prefix": "postgres-source-",
"connection.user": "*********",
"poll.interval.ms": "3600000",
"numeric.mapping": "best_fit",
"connection.url": "jdbc:postgresql://*******:5432/*******",
"table.whitelist": "table1",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false"
}
Run Code Online (Sandbox Code Playgroud)
使用此配置,我收到此错误:
“连接器使用非限定表名作为主题名,并检测到重复的非限定表名。这可能导致主题中的混合数据类型和下游处理错误。为防止此类处理错误,JDBC Source 连接器在执行时无法启动检测重复的表名配置”
显然,连接器不想将多个同名表中的数据发布到单个主题。
这对我来说无关紧要,它可以转到单个主题或多个主题(每个模式一个)。
作为附加信息,如果我添加:
"schema.pattern": "schema1"
Run Code Online (Sandbox Code Playgroud)
到配置,连接器工作并且来自指定模式和表的数据被复制。
有没有办法复制包含同名表的多个模式?
谢谢
postgresql jdbc apache-kafka apache-kafka-connect confluent-platform
我想使用 Kafka Connector 将数据从 Kafka 流式传输到 MongoDB。我找到了这个https://github.com/hpgrahsl/kafka-connect-mongodb。但是没有步骤可做。
谷歌搜索后,它似乎导致了我不想使用的Confluent Platform。
任何人都可以分享我的文档/指南,如何在不使用 Confluent 平台或其他 Kafka 连接器的情况下使用kafka-connect-mongodb将数据从 Kafka 流式传输到 MongoDB?
先感谢您。
我试过的
Step1:我mongo-kafka-connect-0.1-all.jar从maven central下载
Step2:将jar文件复制到一个新文件夹plugins里面kafka(我在windows上用的是Kafka,所以目录是D:\git\1.libraries\kafka_2.12-2.2.0\plugins)
步骤 3:connect-standalone.properties通过添加新行来
编辑文件plugin.path=/git/1.libraries/kafka_2.12-2.2.0/plugins
步骤 4:我为 mongoDB 接收器添加新的配置文件 MongoSinkConnector.properties
name=mongo-sink
topics=test
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.ignore=true
# Specific global MongoDB Sink Connector configuration
connection.uri=mongodb://localhost:27017,mongo1:27017,mongo2:27017,mongo3:27017
database=test_kafka
collection=transaction
max.num.retries=3
retries.defer.timeout=5000
type.name=kafka-connect
Run Code Online (Sandbox Code Playgroud)
第五步:运行命令 bin\windows\connect-standalone.bat config\connect-standalone.properties config\MongoSinkConnector.properties
但是,我得到了错误
[2019-07-09 10:19:09,466] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't …Run Code Online (Sandbox Code Playgroud)
我试图设置 Kafka Connect 以运行 ElasticsearchSinkConnector。
Kafka 设置,由 3 个使用 Kerberos、SSL 和 ACL 保护的代理组成。
到目前为止,我一直在尝试使用 docker/docker-compose(Confluent docker-image 5.4 with Kafka 2.4)连接到远程 kafka 安装(Kafka 2.0.1 - 实际上是我们的生产环境)运行连接框架和 elasticserch-server 本地)。
KAFKA_OPTS: -Djava.security.krb5.conf=/etc/kafka-connect/secrets/krb5.conf
CONNECT_BOOTSTRAP_SERVERS: srv-kafka-1.XXX.com:9093,srv-kafka-2.XXX.com:9093,srv-kafka-3.XXX.com:9093
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: user-grp
CONNECT_CONFIG_STORAGE_TOPIC: test.internal.connect.configs
CONNECT_OFFSET_STORAGE_TOPIC: test.internal.connect.offsets
CONNECT_STATUS_STORAGE_TOPIC: test.internal.connect.status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: srv-kafka-1.XXX.com:2181,srv-kafka-2.XXX.com:2181,srv-kafka-3.XXX.com:2181
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_KERBEROS_SERVICE_NAME: "kafka"
CONNECT_SASL_JAAS_CONFIG: com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/etc/kafka-connect/secrets/kafka-connect.keytab" \
principal="<principal>;
CONNECT_SASL_MECHANISM: GSSAPI
CONNECT_SSL_TRUSTSTORE_LOCATION: <path_to_truststore.jks>
CONNECT_SSL_TRUSTSTORE_PASSWORD: <PWD> …Run Code Online (Sandbox Code Playgroud)