标签: kafka-consumer-api

Kafka高级消费者:分区是否可以有多个线程在使用它?

可以将给定分区中的消息划分为多个线程吗?假设我有一个分区,每个进程有一百个进程,每个进程有一百个线程-我的单个分区中的消息是否仅提供给这10000个线程中的一个?

multithreading apache-kafka kafka-consumer-api

1
推荐指数
1
解决办法
2877
查看次数

从kafka 0.9读取消费者偏差的工具

我正在使用Kafka 0.9消费者API.我需要检查给定使用者组和主题的消费者的当前偏移量,并且没有找到任何列出此信息的命令行工具.我尝试了以下命令

bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group <group_name> 
Run Code Online (Sandbox Code Playgroud)

但这并未列出消费者,即使我有消费者在运行/投票.

基于Kafka 0.9新的消费者api ---如何仅仅观看消费者抵消似乎有这方面的命令行工具,但我不知道这是否适合Kafka 0.9消费者.

任何帮助,将不胜感激.

apache-kafka kafka-consumer-api

1
推荐指数
1
解决办法
7535
查看次数

Kafka Mirror Maker:消费者与消费者的线程编号和生产者编号

我想清楚地指出一个Mirror Maker的Kafka Parallelism模型.

对于我在消费者方面的理解:

  • 消费者集团是一组消费者.该组的每个消费者都可以从一个或多个主题中读取.

  • 该组的消费者可以拥有多个流,即从主题中读取的线程数,最佳做法是将一个线程用于分区.

我的疑问是:我们是否将多个线程的消费者与单线程或一个消费者挂钩?消费者群体是指一组消费者还是一个多线程消费者?

我发现很难从文档中指出这些问题,我想知道我是不是错了.

即使在生产者方面,这些考虑因素也是双向的吗?

apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-connect

1
推荐指数
1
解决办法
827
查看次数

卡夫卡消费者再平衡状况

如果我创建一个消费者C1与组consumerGroup从主题读取数据A。不久之后C2,在同一组中创建消费者,以从topic中读取数据B

创造消费者会C2引发再平衡吗?作为一个更普遍的问题,kafka何时会执行重新平衡?

apache-kafka kafka-consumer-api

1
推荐指数
1
解决办法
1677
查看次数

Kafka Scala消费者代码-打印消耗的记录

当我通过使用url创建以下简单的kafka消费者时:https : //gist.github.com/akhil/6dfda8a04e33eff91a20

在该链接中,要打印消耗的记录,请使用未识别的单词“ asScala”。好的,告诉我如何迭代返回类型:ConsumerRecord [String,String],它是poll()方法的返回类型。

import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}

 
object KafkaConsumerEx extends App {

  val topic_name = "newtopic55"
  val consumer_group = "KafkaConsumerBatch"

  val prot = new Properties()
  prot.put("bootstrap.servers","localhost:9092")
  prot.put("group.id",consumer_group)
  prot.put("key.deserializer",  "org.apache.kafka.common.serialization.StringDeserializer")
  prot.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")

  val kfk_consumer = new KafkaConsumer[String,String](prot)
  kfk_consumer.subscribe(util.Collections.singleton(topic_name))
  println("here")

   while(true){
    val consumer_record : ConsumerRecords[String, String]  = kfk_consumer.poll(100)
    println("records count : " + consumer_record.count())
    println("records partitions: " + consumer_record.partitions())
    consumer_record.iterator().


  }

}
Run Code Online (Sandbox Code Playgroud)

感谢在广告中。

scala kafka-consumer-api

1
推荐指数
1
解决办法
2517
查看次数

Kafka Streams:一条记录​​到多条记录

鉴于:我在Kafka中有两个主题让我们说主题A和主题B.Kafka Stream从主题A读取记录,处理它并产生与消费记录对应的多个记录(比如说记录A和记录B).现在,问题是如何使用Kafka Streams实现这一目标.

KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
        @Override
        public List<Message> apply(final Message message) {
          return consumerRecordHandler.process(message);
        }
    }).*someFunction*()
Run Code Online (Sandbox Code Playgroud)

这里,读取的记录是Message; 处理完毕后,返回Message列表.如何将此列表分成两个生产者流?任何帮助将不胜感激.

apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams

1
推荐指数
1
解决办法
4488
查看次数

如何测试 Kafka 消费者

我有一个 Kafka Consumer(内置于 Scala),它从 Kafka 中提取最新记录。消费者看起来像这样:

val consumerProperties = new Properties()
consumerProperties.put("bootstrap.servers", "localhost:9092")
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("group.id", "something")
consumerProperties.put("auto.offset.reset", "latest")

val consumer = new KafkaConsumer[String, String](consumerProperties)
consumer.subscribe(java.util.Collections.singletonList("topic"))
Run Code Online (Sandbox Code Playgroud)

现在,我想为它编写一个集成测试。有没有测试 Kafka 消费者的方法或最佳实践?

scala apache-kafka kafka-consumer-api

1
推荐指数
1
解决办法
7381
查看次数

Kafka与Scala一起流

我试图使用下面scala的kafka流是我的Java代码,它完全正常:

KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textLines = builder.stream("TextLinesTopic");
    textLines.foreach((key,values) -> {
        System.out.println(values);
    });

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();
Run Code Online (Sandbox Code Playgroud)

我的scala代码如下:

  val builder = new KStreamBuilder
  val textLines:KStream[String, String]  = builder.stream("TextLinesTopic")
  textLines.foreach((key,value)-> {
   println(key)
  })

  val streams = new KafkaStreams(builder, config)
  streams.start()
Run Code Online (Sandbox Code Playgroud)

scala代码抛出编译错误.期望类型不匹配:找不到ForEachAction [ > String, > String],Actual((any,any),Unit):找不到值键:值value

有没有人知道如何在scala中使用流API

scala apache-kafka kafka-consumer-api apache-kafka-streams

1
推荐指数
1
解决办法
1197
查看次数

Spring Kafka Consumer - 打印 Kafka 滞后信息

我创建了一个从主题读取的 spring kafka 消费者。有没有办法打印类似于我们打印分区信息的滞后信息?

java kafka-consumer-api spring-kafka

1
推荐指数
1
解决办法
2496
查看次数

即使我在生产者配置中指定了压缩类型,kafka 代理也没有压缩我更大尺寸的消息

下面是我的生产者配置,如果您看到它们的压缩类型为 gzip ,即使我提到了压缩类型,为什么消息没有发布并且它失败了

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, edi856KafkaConfig.getBootstrapServersConfig());
        props.put(ProducerConfig.RETRIES_CONFIG, edi856KafkaConfig.getRetriesConfig());
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, edi856KafkaConfig.getBatchSizeConfig());
        props.put(ProducerConfig.LINGER_MS_CONFIG, edi856KafkaConfig.getIntegerMsConfig());
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, edi856KafkaConfig.getBufferMemoryConfig());
        ***props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");***
        props.put(Edi856KafkaProducerConstants.SSL_PROTOCOL, edi856KafkaConfig.getSslProtocol());
        props.put(Edi856KafkaProducerConstants.SECURITY_PROTOCOL, edi856KafkaConfig.getSecurityProtocol());
        props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_LOCATION, edi856KafkaConfig.getSslKeystoreLocation());
        props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_PASSWORD, edi856KafkaConfig.getSslKeystorePassword());
        props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_LOCATION, edi856KafkaConfig.getSslTruststoreLocation());
        props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_PASSWORD, edi856KafkaConfig.getSslTruststorePassword());
        **props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");** 
Run Code Online (Sandbox Code Playgroud)

错误如下

org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
2017-12-07_12:34:10.037 [http-nio-8080-exec-1] ERROR c.tgt.trans.producer.Edi856Producer - Exception while writing mesage to topic= '{}'
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes …
Run Code Online (Sandbox Code Playgroud)

jms apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

1
推荐指数
1
解决办法
1239
查看次数