小编Phi*_*ann的帖子

Kafka Connect Distributed tasks.max配置设置的理想价值?

我期待产品电离并部署​​我的Kafka Connect应用程序.但是,我有两个关于tasks.max设置的问题,这是必需的并且具有很高的重要性,但细节对于实际设置此值的内容是模糊的.

我最简单的问题如下:如果我有一个带有n个分区的主题,我希望从中获取数据并写入某个接收器(在我的情况下,我写入S3),我应该将tasks.max设置为什么?我应该把它设置为n吗?我应该把它设置为2n吗?直观地说,似乎我想将值设置为n,这就是我一直在做的事情.

如果我更改我的Kafka主题并增加主题分区怎么办?如果我把它设置为n,我将不得不暂停我的Kafka连接器并增加tasks.max?如果我设置了2n的值,那么我的连接器应该自动增加它运行的并行度?

谢谢你的帮助!

amazon-s3 apache-kafka confluent apache-kafka-connect

15
推荐指数
1
解决办法
3409
查看次数

在 Gatling 中控制每秒请求数和超时阈值

我正在研究加特林模拟。在我的一生中,我无法让我的代码达到每秒 10000 个请求。我已经阅读了文档并且我一直在使用不同的方法等等,但是我的每秒请求似乎限制在每秒 5000 个请求。我附上了我当前的代码迭代。URL和路径信息被模糊了。假设我的模拟的 HTTP 部分没有问题。

package computerdatabase

import io.gatling.core.Predef._
import io.gatling.http.Predef._
import scala.concurrent.duration._
//import assertions._

class userSimulation extends Simulation {

  object Query {
    val feeder = csv("firstfileSHUF.txt").random
    val query = repeat(2000) {
                feed(feeder).
                exec(http("user")
                .get("/path/path/" + "${userID}" + "?fullData=true"))
    }
  }

  val baseUrl = "http:URL:7777"

  val httpConf = http
    .baseURL(baseUrl) // Here is the root for all relative URLs

  val scn = scenario("user") // A scenario is a chain of requests and pauses
    .exec(Query.query)

   setUp(scn.inject(rampUsers(1500) over (60 seconds))) …
Run Code Online (Sandbox Code Playgroud)

scala gatling

6
推荐指数
1
解决办法
8069
查看次数

Kafka Connect offset.storage.topic 未收到消息(即如何访问 Kafka Connect 偏移元数据?)

我正在设置一个 Kafka Connect 分布式模式应用程序,它将是一个 Kafka 到 S3 的管道。我正在使用 Kafka 0.10.1.0-1 和 Kafka Connect 3.1.1-1。到目前为止,事情进展顺利,但对我正在使用的更大系统很重要的一个方面需要知道 Kafka -> FileSystem 管道的偏移信息。根据文档,offset.storage.topic配置将是分布式模式应用程序用于存储偏移信息的位置。考虑到 Kafka 如何在“新”Kafka 中存储消费者偏移量,这是有道理的。但是,在对 FileStreamSinkConnector 进行了一些测试后,没有任何内容写入 myoffset.storage.topic默认值:connect-offsets

具体来说,我使用 Python Kafka 生产者将数据推送到主题,并使用 Kafka Connect 和 FileStreamSinkConnect 将数据从主题输出到文件。这可以正常工作并按照我期望的连接器行为运行。此外,当我停止连接器并启动连接器时,应用程序会记住主题中的状态并且没有数据重复。但是,当我去offset.storage.topic查看存储了哪些偏移元数据时,主题中没有任何内容。

这是我使用的命令:

kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic connect-offsets --from-beginning

让此命令运行一分钟左右后,我收到此消息:

Processed a total of 0 messages

总而言之,我有两个问题:

  1. 为什么即使我的分布式应用程序正确保持状态,偏移元数据也没有写入应该存储它的主题?
  2. 如何访问 Kafka Connect 分布式模式应用程序的偏移元数据信息?这对于我团队的 Lambda 架构实现我们的系统是 100% 必要的。

谢谢您的帮助。

apache-kafka apache-kafka-connect

2
推荐指数
1
解决办法
3015
查看次数