目前Kafka Connect
正在收集两种类型的日志。
connect-rest.log.2018-07-01-21
, connect-rest.log.2018-07-01-22
...connectDistributed.out
问题是我不知道如何connectDistributed.out
在 Kafka Connect 中配置文件。以下是该文件的示例输出:
[2018-07-11 08:42:40,798] INFO WorkerSinkTask{id=elasticsearch-sink-
connector-0} Committing offsets asynchronously using sequence number
216: {test-1=OffsetAndMetadata{offset=476028, metadata=‘’},
test-0=OffsetAndMetadata{offset=478923, metadata=‘’},
test-2=OffsetAndMetadata{offset=477944, metadata=‘’}}
(org.apache.kafka.connect.runtime.WorkerSinkTask:325)
[2018-07-11 08:43:40,798] INFO WorkerSinkTask{id=elasticsearch-sink-connector0}
Committing offsets asynchronously using sequence number 217:
{test-1=OffsetAndMetadata{offset=476404, metadata=‘’},
test-0=OffsetAndMetadata{offset=479241, metadata=‘’},
test-2=OffsetAndMetadata{offset=478316, metadata=‘’}}
(org.apache.kafka.connect.runtime.WorkerSinkTask:325)
Run Code Online (Sandbox Code Playgroud)
没有配置任何日志记录选项,文件大小随着时间的推移变得越来越大。今天,它达到了 20GB,我不得不手动清空文件。所以我的问题是如何配置它connectDistributed.out
?我正在为其他组件(例如 kafka 代理日志)配置日志选项。
以下是confluent-4.1.0/etc/kafka
我正在使用的一些与 Kafka Connect 相关的日志配置。
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender …
Run Code Online (Sandbox Code Playgroud) 我在 EC2 4GB RAM 实例上运行独立的 kafka 代理。默认设置中,Kafka配置为使用1GB内存 -Xmx1G -Xms1G
由于VM只有4GB内存,是否可以将JVM设置配置为使用512MB?我该怎么做?Kafka 会在 512MB 内存下正常运行还是 1GB 最低要求?
我想将 sailsjs 项目代码(来自 docker)提取到我的本地系统(Linux 机器)中。
我运行以下命令
docker exec -it containername_or_ID /bin/bash
Run Code Online (Sandbox Code Playgroud)
我可以使用此命令查看我的代码。但我无法将此代码输入我的本地系统。
将代码放入本地系统的任何想法
我正在使用 split() 来解析直到冒号。我的文本中有几个冒号,但我只需要第一行的字符串。我需要做什么才能获得第一行?
line = """Hello :
This is a test ......:
Testpath: C:\\...
blablablabla
123:"""
if ' :' in line:
av = line.split(" :",1)[0]
print av
Run Code Online (Sandbox Code Playgroud)
是否可以在不使用正则表达式的情况下访问第一行??
目标:读取来自主题的所有消息,然后终止进程。
我能够连续阅读以下消息:
props.put("bootstrap.servers", kafkaBootstrapSrv);
props.put("group.id", group_id);
props.put("max.poll.records", 1); // Only get one record at a time. I understand that to read all messages this will need to be increased
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("MY_TOPIC"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
for (ConsumerRecord<String, String> record : records) {
process_record(record);
}
consumer.commitSync();
}
Run Code Online (Sandbox Code Playgroud)
但在这种情况下,进程永远不会终止。当我摆脱
while (true)
Run Code Online (Sandbox Code Playgroud)
循环并运行程序,它不会从主题中获取一条记录(我希望有一条记录)。这是为什么?
最近,我们的 Kafka 经纪人之一(共 5 个)被错误关闭。现在我们再次启动它,有很多关于损坏的索引文件的警告消息,即使在 24 小时后,代理仍在启动。该代理中有超过 400 GB 的数据。
尽管其余代理已启动并正在运行,但某些分区将 -1 显示为它们的领导者,而将坏代理显示为唯一的 ISR。我没有看到其他副本被任命为新的领导者,也许是因为坏代理是这些分区唯一同步的代理。
Broker Properties:
Replication Factor: 3
Min In Sync Replicas: 1
Run Code Online (Sandbox Code Playgroud)
我不知道如何处理这个。我应该等待经纪人自己解决所有问题吗?花这么多时间正常吗?
还有什么我可以做的吗?请帮忙。
我找不到节点和Zookeeper之间通信的任何细节.假设在Kafka节点中发生了更多垃圾收集,它会产生什么结果?
我正在尝试使用 SASL 验证我的 Kafka Rest 代理,但在将本地 docker compose 中所做的配置传输到 Kubernetes 时遇到问题。
我正在使用 JAAS 配置来实现此目的。我的 JAAS 文件如下所示。
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="rest"
password="rest-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="rest"
password="restsecret";
};
Run Code Online (Sandbox Code Playgroud)
然后在我的 docker compose 中我已经完成了:
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/rest_jaas.conf
我如何将同样的逻辑转移到 Kubernetes?我尝试过像这样传递 env 变量:
env:
- name: KAFKA_OPTS
value: |
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="rest"
password="rest-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="rest"
password="rest-secret";
};
Run Code Online (Sandbox Code Playgroud)
但它仍然失败。这是我的日志所说的:
Error: Could not find or load main class KafkaClient
/bin/sh: 3: org.apache.kafka.common.security.plain.PlainLoginModule: not found
/bin/sh: 6: Syntax error: "}" …
Run Code Online (Sandbox Code Playgroud) 我们正在使用 kafka 流 api 进行聚合,其中我们也使用 group by。我们还使用状态存储来保存输入主题数据。
我注意到的是
Kafka内部创建了3种topic
Changelog-<storeid>-<partition>
Repartition-<storeid>-<partition>
<topicname>-<partition>
我无法理解的是
<topic>-<partition>
数据有什么不同,因此它必须为此保存不同的文件。
我们集群中有 3 个 kafka 经纪人
卡夫卡版本 - 1.0.0
每台 kafka 机器还包括:zookeper 服务器、模式注册表
停止/启动以下服务的正确顺序是什么:
1. kafka
2 zookeeper
3 schema registry
Run Code Online (Sandbox Code Playgroud) apache-kafka ×8
java ×2
docker ×1
heap ×1
kubernetes ×1
log4j ×1
logging ×1
python ×1
python-3.x ×1