可以将给定分区中的消息划分为多个线程吗?假设我有一个分区,每个进程有一百个进程,每个进程有一百个线程-我的单个分区中的消息是否仅提供给这10000个线程中的一个?
我正在使用Kafka 0.9消费者API.我需要检查给定使用者组和主题的消费者的当前偏移量,并且没有找到任何列出此信息的命令行工具.我尝试了以下命令
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group <group_name>
Run Code Online (Sandbox Code Playgroud)
但这并未列出消费者,即使我有消费者在运行/投票.
基于Kafka 0.9新的消费者api ---如何仅仅观看消费者抵消似乎有这方面的命令行工具,但我不知道这是否适合Kafka 0.9消费者.
任何帮助,将不胜感激.
我想清楚地指出一个Mirror Maker的Kafka Parallelism模型.
对于我在消费者方面的理解:
消费者集团是一组消费者.该组的每个消费者都可以从一个或多个主题中读取.
该组的消费者可以拥有多个流,即从主题中读取的线程数,最佳做法是将一个线程用于分区.
我的疑问是:我们是否将多个线程的消费者与单线程或一个消费者挂钩?消费者群体是指一组消费者还是一个多线程消费者?
我发现很难从文档中指出这些问题,我想知道我是不是错了.
即使在生产者方面,这些考虑因素也是双向的吗?
apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-connect
如果我创建一个消费者C1与组consumerGroup从主题读取数据A。不久之后C2,在同一组中创建消费者,以从topic中读取数据B。
创造消费者会C2引发再平衡吗?作为一个更普遍的问题,kafka何时会执行重新平衡?
当我通过使用url创建以下简单的kafka消费者时:https : //gist.github.com/akhil/6dfda8a04e33eff91a20。
在该链接中,要打印消耗的记录,请使用未识别的单词“ asScala”。好的,告诉我如何迭代返回类型:ConsumerRecord [String,String],它是poll()方法的返回类型。
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
object KafkaConsumerEx extends App {
val topic_name = "newtopic55"
val consumer_group = "KafkaConsumerBatch"
val prot = new Properties()
prot.put("bootstrap.servers","localhost:9092")
prot.put("group.id",consumer_group)
prot.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
prot.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
val kfk_consumer = new KafkaConsumer[String,String](prot)
kfk_consumer.subscribe(util.Collections.singleton(topic_name))
println("here")
while(true){
val consumer_record : ConsumerRecords[String, String] = kfk_consumer.poll(100)
println("records count : " + consumer_record.count())
println("records partitions: " + consumer_record.partitions())
consumer_record.iterator().
}
}Run Code Online (Sandbox Code Playgroud)
感谢在广告中。
鉴于:我在Kafka中有两个主题让我们说主题A和主题B.Kafka Stream从主题A读取记录,处理它并产生与消费记录对应的多个记录(比如说记录A和记录B).现在,问题是如何使用Kafka Streams实现这一目标.
KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
@Override
public List<Message> apply(final Message message) {
return consumerRecordHandler.process(message);
}
}).*someFunction*()
Run Code Online (Sandbox Code Playgroud)
这里,读取的记录是Message; 处理完毕后,返回Message列表.如何将此列表分成两个生产者流?任何帮助将不胜感激.
apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams
我有一个 Kafka Consumer(内置于 Scala),它从 Kafka 中提取最新记录。消费者看起来像这样:
val consumerProperties = new Properties()
consumerProperties.put("bootstrap.servers", "localhost:9092")
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("group.id", "something")
consumerProperties.put("auto.offset.reset", "latest")
val consumer = new KafkaConsumer[String, String](consumerProperties)
consumer.subscribe(java.util.Collections.singletonList("topic"))
Run Code Online (Sandbox Code Playgroud)
现在,我想为它编写一个集成测试。有没有测试 Kafka 消费者的方法或最佳实践?
我试图使用下面scala的kafka流是我的Java代码,它完全正常:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
textLines.foreach((key,values) -> {
System.out.println(values);
});
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Run Code Online (Sandbox Code Playgroud)
我的scala代码如下:
val builder = new KStreamBuilder
val textLines:KStream[String, String] = builder.stream("TextLinesTopic")
textLines.foreach((key,value)-> {
println(key)
})
val streams = new KafkaStreams(builder, config)
streams.start()
Run Code Online (Sandbox Code Playgroud)
scala代码抛出编译错误.期望类型不匹配:找不到ForEachAction [ > String, > String],Actual((any,any),Unit):找不到值键:值value
有没有人知道如何在scala中使用流API
我创建了一个从主题读取的 spring kafka 消费者。有没有办法打印类似于我们打印分区信息的滞后信息?
下面是我的生产者配置,如果您看到它们的压缩类型为 gzip ,即使我提到了压缩类型,为什么消息没有发布并且它失败了
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, edi856KafkaConfig.getBootstrapServersConfig());
props.put(ProducerConfig.RETRIES_CONFIG, edi856KafkaConfig.getRetriesConfig());
props.put(ProducerConfig.BATCH_SIZE_CONFIG, edi856KafkaConfig.getBatchSizeConfig());
props.put(ProducerConfig.LINGER_MS_CONFIG, edi856KafkaConfig.getIntegerMsConfig());
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, edi856KafkaConfig.getBufferMemoryConfig());
***props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");***
props.put(Edi856KafkaProducerConstants.SSL_PROTOCOL, edi856KafkaConfig.getSslProtocol());
props.put(Edi856KafkaProducerConstants.SECURITY_PROTOCOL, edi856KafkaConfig.getSecurityProtocol());
props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_LOCATION, edi856KafkaConfig.getSslKeystoreLocation());
props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_PASSWORD, edi856KafkaConfig.getSslKeystorePassword());
props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_LOCATION, edi856KafkaConfig.getSslTruststoreLocation());
props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_PASSWORD, edi856KafkaConfig.getSslTruststorePassword());
**props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");**
Run Code Online (Sandbox Code Playgroud)
错误如下
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
2017-12-07_12:34:10.037 [http-nio-8080-exec-1] ERROR c.tgt.trans.producer.Edi856Producer - Exception while writing mesage to topic= '{}'
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes …Run Code Online (Sandbox Code Playgroud) jms apache-kafka kafka-consumer-api kafka-producer-api spring-kafka