小编cri*_*007的帖子

配置Kafka Connect分布式连接器日志(connectDistributed.out)

目前Kafka Connect正在收集两种类型的日志。

  • connect-rest.log.2018-07-01-21, connect-rest.log.2018-07-01-22...
  • connectDistributed.out

问题是我不知道如何connectDistributed.out在 Kafka Connect 中配置文件。以下是该文件的示例输出:

[2018-07-11 08:42:40,798] INFO WorkerSinkTask{id=elasticsearch-sink- 
connector-0} Committing offsets asynchronously using sequence number 
216: {test-1=OffsetAndMetadata{offset=476028, metadata=‘’}, 
test-0=OffsetAndMetadata{offset=478923, metadata=‘’}, 
test-2=OffsetAndMetadata{offset=477944, metadata=‘’}} 
(org.apache.kafka.connect.runtime.WorkerSinkTask:325)
[2018-07-11 08:43:40,798] INFO WorkerSinkTask{id=elasticsearch-sink-connector0} 
Committing offsets asynchronously using sequence number 217: 
{test-1=OffsetAndMetadata{offset=476404, metadata=‘’}, 
test-0=OffsetAndMetadata{offset=479241, metadata=‘’}, 
test-2=OffsetAndMetadata{offset=478316, metadata=‘’}} 
(org.apache.kafka.connect.runtime.WorkerSinkTask:325)
Run Code Online (Sandbox Code Playgroud)

没有配置任何日志记录选项,文件大小随着时间的推移变得越来越大。今天,它达到了 20GB,我不得不手动清空文件。所以我的问题是如何配置它connectDistributed.out?我正在为其他组件(例如 kafka 代理日志)配置日志选项。


以下是confluent-4.1.0/etc/kafka我正在使用的一些与 Kafka Connect 相关的日志配置。

log4j.properties

log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender …
Run Code Online (Sandbox Code Playgroud)

logging log4j apache-kafka apache-kafka-connect

3
推荐指数
1
解决办法
7892
查看次数

低于 1GB 的 Kafka broker JVM 设置

我在 EC2 4GB RAM 实例上运行独立的 kafka 代理。默认设置中,Kafka配置为使用1GB内存 -Xmx1G -Xms1G

由于VM只有4GB内存,是否可以将JVM设置配置为使用512MB?我该怎么做?Kafka 会在 512MB 内存下正常运行还是 1GB 最低要求?

java heap apache-kafka

3
推荐指数
1
解决办法
2967
查看次数

如何从docker中提取项目代码到本地系统

我想将 sailsjs 项目代码(来自 docker)提取到我的本地系统(Linux 机器)中。

我运行以下命令

docker exec -it containername_or_ID /bin/bash
Run Code Online (Sandbox Code Playgroud)

我可以使用此命令查看我的代码。但我无法将此代码输入我的本地系统。

将代码放入本地系统的任何想法

docker

3
推荐指数
1
解决办法
2万
查看次数

如何仅提取多行字符串的第一行

我正在使用 split() 来解析直到冒号。我的文本中有几个冒号,但我只需要第一行的字符串。我需要做什么才能获得第一行?

line = """Hello : 
          This is a test ......:

          Testpath: C:\\...
          blablablabla
          123:"""

if ' :' in line: 
  av = line.split(" :",1)[0]
  print av
Run Code Online (Sandbox Code Playgroud)

是否可以在不使用正则表达式的情况下访问第一行??

python python-3.x

3
推荐指数
1
解决办法
3518
查看次数

Kafka:按需消费所有消息

目标:读取来自主题的所有消息,然后终止进程。

我能够连续阅读以下消息:

props.put("bootstrap.servers", kafkaBootstrapSrv);
props.put("group.id", group_id);
props.put("max.poll.records", 1); // Only get one record at a time. I understand that to read all messages this will need to be increased
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("MY_TOPIC"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(500);

    for (ConsumerRecord<String, String> record : records) {
        process_record(record);
    }

    consumer.commitSync();
}
Run Code Online (Sandbox Code Playgroud)

但在这种情况下,进程永远不会终止。当我摆脱

while (true)
Run Code Online (Sandbox Code Playgroud)

循环并运行程序,它不会从主题中获取一条记录(我希望有一条记录)。这是为什么?

java apache-kafka kafka-consumer-api

3
推荐指数
1
解决办法
3701
查看次数

卡夫卡经纪人花了太长时间才出现

最近,我们的 Kafka 经纪人之一(共 5 个)被错误关闭。现在我们再次启动它,有很多关于损坏的索引文件的警告消息,即使在 24 小时后,代理仍在启动。该代理中有超过 400 GB 的数据。

尽管其余代理已启动并正在运行,但某些分区将 -1 显示为它们的领导者,而将坏代理显示为唯一的 ISR。我没有看到其他副本被任命为新的领导者,也许是因为坏代理是这些分区唯一同步的代理。

Broker Properties: 
Replication Factor: 3
Min In Sync Replicas: 1
Run Code Online (Sandbox Code Playgroud)

我不知道如何处理这个。我应该等待经纪人自己解决所有问题吗?花这么多时间正常吗?

还有什么我可以做的吗?请帮忙。

apache-kafka

3
推荐指数
1
解决办法
1680
查看次数

Kafka节点和zookeeper如何相互通信?

我找不到节点和Zookeeper之间通信的任何细节.假设在Kafka节点中发生了更多垃圾收集,它会产生什么结果?

  1. Zookeeper会断开与各个节点的通信吗?
  2. 如果Zookeeper将断开相应的节点将会产生什么结果?

apache-kafka apache-zookeeper

3
推荐指数
1
解决办法
734
查看次数

如何通过JAAS配置kafka env变量kubernetes

我正在尝试使用 SASL 验证我的 Kafka Rest 代理,但在将本地 docker compose 中所做的配置传输到 Kubernetes 时遇到问题。

我正在使用 JAAS 配置来实现此目的。我的 JAAS 文件如下所示。

KafkaClient {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="rest"
       password="rest-secret";
};

Client {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       username="rest"
       password="restsecret";
};
Run Code Online (Sandbox Code Playgroud)

然后在我的 docker compose 中我已经完成了:

KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/rest_jaas.conf

我如何将同样的逻辑转移到 Kubernetes?我尝试过像这样传递 env 变量:

env:
  - name: KAFKA_OPTS
    value: |
      KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="rest"
        password="rest-secret";
      };
      Client {
        org.apache.zookeeper.server.auth.DigestLoginModule required
        username="rest"
        password="rest-secret";
      };
Run Code Online (Sandbox Code Playgroud)

但它仍然失败。这是我的日志所说的:

Error: Could not find or load main class KafkaClient
/bin/sh: 3: org.apache.kafka.common.security.plain.PlainLoginModule: not found
/bin/sh: 6: Syntax error: "}" …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kubernetes

3
推荐指数
1
解决办法
4407
查看次数

Kafka 使用了哪些内部主题?

我们正在使用 kafka 流 api 进行聚合,其中我们也使用 group by。我们还使用状态存储来保存输入主题数据。

我注意到的是

Kafka内部创建了3种topic

  1. Changelog-<storeid>-<partition>
  2. Repartition-<storeid>-<partition>
  3. <topicname>-<partition>

我无法理解的是

  1. 为什么当我拥有所有数据时它会创建变更日志主题 <topic>-<partition>
  2. 重新分区主题是否包含分组后的数据。
  3. 我看到 Changelog 和 topicname-parition 的大小大约相同。

数据有什么不同,因此它必须为此保存不同的文件。

apache-kafka apache-kafka-streams

3
推荐指数
2
解决办法
4111
查看次数

使用zookeeper和模式注册表停止/启动kafka的正确顺序是什么

我们集群中有 3 个 kafka 经纪人

卡夫卡版本 - 1.0.0

每台 kafka 机器还包括:zookeper 服务器、模式注册表

停止/启动以下服务的正确顺序是什么:

1. kafka 
2  zookeeper 
3  schema registry 
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-zookeeper

3
推荐指数
1
解决办法
2817
查看次数