Kafka分区键无法正常工作

Hao*_*ang 5 scala apache-kafka

我正在努力如何正确使用分区键机制.我的逻辑是将分区号设置为3,然后创建三个分区键为"0","1","2",然后使用分区键创建三个KeyedMessage,如

  • KeyedMessage(主题,"0",消息)
  • KeyedMessage(主题,"1",消息)
  • KeyedMessage(主题,"2",消息)

在此之后,创建一个生成器实例以发送所有KeyedMessage.

我希望每个KeyedMessage都应该根据不同的分区键进入不同的分区,这意味着

  • KeyedMessage(主题,"0",消息)转到分区0
  • KeyedMessage(主题,"1",消息)转到分区1
  • KeyedMessage(主题,"2",消息)转到分区2

我正在使用Kafka-web-console来观察主题状态,但结果并不像我期望的那样.KeyedMessage仍然随机进入分区,有时候两个KeyedMessage将进入同一个分区,即使它们有不同的分区键.

为了使我的问题更清楚,我想发布一些我现有的Scala代码,并且我正在使用Kafka 0.8.2-beta和Scala 2.10.4.

这是生产者代码,我没有使用自定义partitioner.class:

  val props = new Properties()

  val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec

  props.put("compression.codec", codec.toString)
  props.put("producer.type", if(synchronously) "sync" else "async")
  props.put("metadata.broker.list", brokerList)
  props.put("batch.num.messages", batchSize.toString)
  props.put("message.send.max.retries", messageSendMaxRetries.toString)
  props.put("request.required.acks",requestRequiredAcks.toString)
  props.put("client.id",clientId.toString)

  val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))

  def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = {
     if (partition == null) {
       new KeyedMessage(topic,message)
     } else {
       new KeyedMessage(topic,partition,message)
     }
  }

  def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8"))

  def send(message: Array[Byte], partition: Array[Byte]): Unit = {
    try {
      producer.send(kafkaMesssage(message, partition))
    } catch {
      case e: Exception =>
        e.printStackTrace
        System.exit(1)
    }       
  }
Run Code Online (Sandbox Code Playgroud)

以下是我如何使用生产者,创建生成器实例,然后使用此实例发送三条消息.目前我将分区键创建为Integer,然后将其转换为Byte Arrays:

  val testMessage = UUID.randomUUID().toString
  val testTopic = "sample1"
  val groupId_1 = "testGroup"

  print("starting sample broker testing")
  val producer = new KafkaProducer(testTopic, "localhost:9092")

  val numList = List(0,1,2);
  for (a <- numList) {
    // Create a partition key as Byte Array
    var key = java.nio.ByteBuffer.allocate(4).putInt(a).array()
    //Here I give a Array[Byte] key
    //so the second "send" function of producer will be called
    producer.send(testMessage.getBytes("UTF8"), key)
  }
Run Code Online (Sandbox Code Playgroud)

不确定我的逻辑是不正确还是我没有正确理解分区键机制.任何人都可以提供一些示例代码或解释会很棒!

Vla*_*kov 9

人们通常认为分区是一种在业务类别上分离业务数据的方法,但这不是查看分区的正确角度.

直接划分影响这些主题:

-performance(每个分区可以与其他分区并行使用)

- 消息顺序(仅在分区级别保证的消息顺序)

我将举例说明如何创建分区:

你有一个话题,比如MyMessagesToWorld

您想将此主题(所有MyMessagesToWorld)转移给某个消费者.

你"重量"MyMessagesToWorld的整个"质量"并发现,这是10公斤.

您在"MyMessagesToWorld"中有以下"业务"类别:

- 爸爸的消息(D)

- 给妈妈的消息(M)

- sis(S)的消息

- 给奶奶的消息(G)

- 老师的问候(T)

- 给女朋友的消息(F)

您认为,谁是您的消费者,并发现您的消费者是侏儒,每个小时消耗1 Kg消息.

你最多可以雇佣2个这样的侏儒.

1个侏儒需要10个小时消耗10公斤的消息,2个侏儒需要5个小时.

因此,您决定使用所有可用的gnome来节省时间.

要为这两个侏儒创建2个"通道",您可以在Kafka上创建此主题的2个分区.如果您调用更多gnomes,请创建更多分区.

您有6个业务类别和2个连续的独立消费者 - gnomes(消费者线程).

该怎么办?

卡夫卡的方法如下:

假设您在群集中有2个kafka实例.(同样的例子OK,如果你在集群中有更多的实例)

您在Kafka上将分区号设置为2,例如(以Kafka 0.8.2.1为例):

您在Kafka中定义主题,告诉您,该主题有2个分区:

kafka-topics.sh(.bat) --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic MyMessagesToWorld 
Run Code Online (Sandbox Code Playgroud)

现在主题MyMessagesToWorld有2个分区:P(0)和P(1).

你选择了2号(分区),因为你知道,你有(侵略)只有2个消耗侏儒.

您可以在以后添加更多分区,届时将使用更多的消费者侏儒.

不要将卡夫卡消费者与这样的侏儒混淆.

卡夫卡消费者可以使用N个侏儒.(N个并行线程)

现在,您可以为消息创建KEY.

您需要KEYS在分区之间分发您的消息.

密钥将是您之前定义的"业务类别"的字母:D,M,S,G,T,F,您认为这些字母可以是ID.

但在一般情况下,无论什么可以用作键:(复杂的对象和字节数组,任何......)

如果您创建NO分区程序,将使用默认分区程序.

默认分区程序有点愚蠢.

它需要每个KEY的哈希码并将其除以可用分区的数量,"提醒"将定义该密钥的分区数.

例:

KEY M, hashcode=12345, partition for M = 12345 % 2 = 1
Run Code Online (Sandbox Code Playgroud)

可以想象,在最好的情况下使用这样的分区程序,您可以在每个分区中登陆3个业务类别.

在更糟糕的情况下,您可以将所有业务类别登陆到1个分区.

如果您有100000个业务类别,则在统计上可以通过此类算法分发它们.

但只有少数类别,您可能没有非常公平的分配.

因此,您可以重写分区程序并更智能地分发您的业务类别.

有一个例子:

此分区程序在可用分区之间平均分配业务类别.

public class CustomPartitioner {

private static Map<String, Integer> keyDistributionTable = new HashMap<String, Integer>();
private static AtomicInteger sequence = new AtomicInteger();
private ReentrantLock lock = new ReentrantLock();

public int partition(ProducerRecord<String, Object> record, Cluster cluster) {

    String key = record.key();
    int seq = figureSeq(key);

    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic());

    if (availablePartitions.size() > 0) {
        int part = seq % availablePartitions.size();
        return availablePartitions.get(part).partition();
    } else {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
        int numPartitions = partitions.size();
        // no partitions are available, give a non-available partition
        return seq % numPartitions;
    }
}


private int figureSeq(String key) {
    int sequentualNumber = 0;
    if(keyDistributionTable.containsKey(key)){
        sequentualNumber = keyDistributionTable.get(key);
    }else{//synchronized region
        //used only for new Keys, so high waiting time for monitor expected only on start
        lock.lock();
        try{
            if(keyDistributionTable.containsKey(key)){
                sequentualNumber =  keyDistributionTable.get(key);
            }else{
                int seq = sequence.incrementAndGet();
                keyDistributionTable.put(key, seq);
                sequentualNumber =  seq;
            }
        }finally{
            lock.unlock();
        }
    }
    return sequentualNumber;
}
Run Code Online (Sandbox Code Playgroud)

}


stu*_*nik 1

有同样的问题 - 只需切换到 ByteArrayParitioner:

props.put("partitioner.class", "kafka.producer.ByteArrayPartitioner")
Run Code Online (Sandbox Code Playgroud)