Mar*_*ina 5 apache-kafka kafka-consumer-api
我正在使用Kafka 0.8.2.1 SimpleConsumer.有人可以澄清SimpleConsumer和FetchRequestBuilder的一些配置参数的含义吗?没有阅读KAfka的源代码,我当时找不到任何文档.(我尝试将此问题发布到kafka用户组 - 但没有运气):
- Q1:在SimpleConsumer构造函数的签名中,我看到了Int'soTimeout'参数 - 这个超时的含义是什么?这是连接到Kafka经纪人的超时吗?从任何[或特定??]请求到Kafka的响应超时(如FetchRequest)?别的什么?
kafka.javaapi.consumer.SimpleConsumer
(val host: String,
val port: Int,
val soTimeout: Int,
val bufferSize: Int,
val clientId: String)
Run Code Online (Sandbox Code Playgroud)
- Q2:同样,SimpleConsumer构造函数采用Int'mufferSize'参数.它是什么意思?这是在发出fetchRequest时SimpleConsumer会读取多少字节?或者它是每次从Kafka获取时读取的最大字节数 - 如果有更多数据可用,则会发生多次提取?
- 通过FetchRequestBuilder构建FetchRequest时(见下文),我还需要指定' fetchSize ':
FetchRequest req= newFetchRequestBuilder ()
.clientId(kafkaGroupId)
.addFetch(topic, partition, offset, fetchSizeInBytes)
.build();
Run Code Online (Sandbox Code Playgroud)
查看FetchRequestBuilder的源代码,我认为(我不是Scala pro)这些调用转换为下面的方法调用 - 并且传递给FetchRequest的最终参数称为' minBytes ',暗示这不是确切的提取大小,可能吗?.它是否意味着它甚至不会取任何东西,除非至少有'minBytes'数据可用?
class FetchRequestBuilder():
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int)
def build() = {
val fetchRequest= FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = ConsumerConfig.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId,
maxWait: Int = FetchRequest.DefaultMaxWait,
**minBytes: Int = FetchRequest.DefaultMinBytes**,
...)
Run Code Online (Sandbox Code Playgroud)
所以,我的最后一个问题是:
- 问题3:' bufferSize '和' fetchSize/minBytes '有什么关系?他们到底定义了什么?我是否必须确保一个比另一个更小或更大?
谢谢,
码头
soTimeout 是等待连接到给定代理的时间(以毫秒为单位)。我不知道连接会发生什么特殊情况,除了您得到验证,那里有一个代理准备好执行一些后续操作。
我相信构造函数中使用的 bufferSize 是客户端套接字用于接收代理发送的数据的缓冲区的大小。
对于您的最后一个问题,如果由于任何原因由获取请求返回的字节总数大于请求的套接字缓冲区大小,则需要多个较低级别的调用来检索所有数据,即使有一个单个更高级别的获取调用。
| 归档时间: |
|
| 查看次数: |
1333 次 |
| 最近记录: |