Ror*_*ory 2 scala apache-kafka kafka-consumer-api
下面的 Scala kafka 消费者没有从 poll调用中。
但是,主题是正确的,我可以看到使用控制台使用者将事件发送到主题:
/opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_topic --from-beginning
Run Code Online (Sandbox Code Playgroud)
当我使用调试器逐步完成并调用时,我还在下面的 Scala 代码示例中看到了该主题 kafkaConsumer.listTopics()
此外,这是从单个单元测试中调用的,所以我只创建了这个特征和消费者的一个实例(即另一个消费者实例不能消费消息)。我也在使用随机 group_id。
下面的代码/配置有什么问题吗?
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.util.Random
trait KafkaTest {
val kafkaConsumerProperties = new Properties()
kafkaConsumerProperties.put("bootstrap.servers", "kafka:9092")
kafkaConsumerProperties.put("group.id", Random.alphanumeric.take(10).mkString)
kafkaConsumerProperties.put("key.deserializer", classOf[ByteArrayDeserializer])
kafkaConsumerProperties.put("value.deserializer", classOf[StringDeserializer])
val kafkaConsumer = new KafkaConsumer[String, String](kafkaConsumerProperties)
kafkaConsumer.subscribe(java.util.Collections.singletonList("my_topic"))
def checkKafkaHasReceivedEvent(): Assertion = {
val kafkaEvents = kafkaConsumer.poll(2000) // Always returns 0 events?
...
}
}
Run Code Online (Sandbox Code Playgroud)
增加轮询超时也无济于事。
要从头读取 AUTO_OFFSET_RESET_CONFIG 属性必须设置为最早,默认情况下为“最新”
kafkaConsumerProperties.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.toString().toLowerCase())
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1092 次 |
| 最近记录: |