KafKa分区器类,使用密钥为主题内的分区分配消息

Hil*_*ild 27 apache-kafka

我是kafka的新手,如果我听起来很愚蠢,但我到目前为止所理解的是......消息流可以被定义为一个主题,就像一个类别.并且每个主题分为一个或多个分区(每个分区可以有多个副本).所以他们并行行事

他们说,从卡夫卡主要网站

生产者能够选择将哪个消息分配给主题中的哪个分区.这可以通过循环方式完成,只是为了平衡负载,或者可以根据一些语义分区功能(例如基于消息中的某些键)来完成.

这是否意味着消费我将能够从特定分区中选择消息偏移量?运行多个分区时,是否可以从一个特定分区中选择,即分区0?

他们说,在卡夫卡0.7 快速启动

发送带分区键的消息.具有相同密钥的消息将发送到同一分区.

并且可以在创建生产者时提供密钥,如下所示

    ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-key", "test-message");
    producer.send(data);
Run Code Online (Sandbox Code Playgroud)

现在我如何根据此密钥使用消息?在Kafka制作时使用此键的实际影响是什么?

在0.8beta中创建生产者时,我们可以通过配置文件提供分区器类属性.可以创建自定义分区器类来实现kafka分区器接口.但有点混淆它是如何工作的.0.8 doc也没有解释太多.有什么建议或mi遗失的东西?

Hil*_*ild 16

这是我到目前为止所发现的.

通过实现kafka Partitioner接口来定义我们自己的自定义分区器类.实现的方法将有两个参数,首先是我们从生产者提供的密钥,然后是可用的分区数.因此,我们可以定义自己的逻辑来设置消息的哪个键进入哪个分区.

现在,在创建生成器时,我们可以使用"partitioner.class"属性指定我们自己的分区器类

    props.put("partitioner.class", "path.to.custom.partitioner.class");
Run Code Online (Sandbox Code Playgroud)

如果我们不提及它,那么Kafka将使用其默认类并尝试在可用的分区之间均匀地分发消息.

另请告知Kafka如何序列化密钥

    props.put("key.serializer.class", "kafka.serializer.StringEncoder");
Run Code Online (Sandbox Code Playgroud)

现在,如果我们使用生产者中的密钥发送一些消息,消息将被传递到特定分区(基于我们在自定义分区器类上编写的逻辑),而在消费者(SimpleConsumer)级别,我们可以指定要检索的分区具体信息.

如果我们需要传递一个String作为键,那么应该在自定义分区器类中处理相同的内容(获取键的哈希值,然后取前两位数等)


jav*_*eek 16

Kafka中的每个主题都分为许多分区.分区允许并行消耗增加吞吐量.

生产者使用Kafka生产者客户端库将消息发布到主题,该库使用分区程序在可用分区之间平衡消息.生产者连接到的代理负责使用zookeeper中的分区所有者信息将消息发送到作为该分区的领导者的代理.消费者使用Kafka的高级消费者库(处理经纪人领导者变更,管理zookeeper中的偏移信息并隐含地计算分区所有者信息等)来使用来自流中分区的消息; 根据消费者选择创建消息流的方式,每个流可以映射到几个分区.

例如,如果一个主题有10个分区,并且3个消费者实例(C1,C2,C3按此顺序启动)都属于同一个消费者组,我们可以使用不同的消费模型,允许读取并行度,如下所示

每个消费者使用单个流. 在此模型中,当C1启动时,主题的所有10个分区都映射到同一个流,并且C1开始从该流中消耗.当C2启动时,Kafka重新平衡两个流之间的分区.因此,每个流将分配到5个分区(取决于重新平衡算法,它可能也是4对6)并且每个消费者从其流消耗.类似地,当C3启动时,分区再次在3个流之间重新平衡.请注意,在此模型中,当从分配给多个分区的流中进行消费时,消息的顺序将在分区之间混乱.

每个消费者使用多个流(比如C1使用3,C2使用3,C3使用4). 在此模型中,当C1启动时,所有10个分区都分配给3个流,C1可以使用多个线程同时从3个流中消耗.当C2启动时,分区在6个流之间重新平衡,类似地,当C3启动时,分区在10个流之间重新平衡.每个消费者可以同时从多个流中消费.请注意,此处的流和分区数相等.如果流的数量超过分区,则某些流将不会获得任何消息,因为它们不会被分配任何分区.

  • 假设所有消费者都属于同一个消费者群体,是的.每个流将映射到1个分区 (2认同)

小智 6

这是否意味着在使用时我将能够从特定分区中选择消息偏移量?在运行多个分区时,是否可以从一个特定分区(即分区0)中进行选择?

是的,您可以从使用者的一个特定分区中选择消息,但是如果希望动态地标识该消息,则取决于您在生产者中实现Partitioner类的逻辑。

现在我如何使用基于此密钥的消息?在Kafka中制作时使用此密钥的实际影响是什么?

消费消息有两种方式。一种是使用Zookeeper主机,另一种是静态主机。Zookeper主机使用所有分区中的消息。但是,如果您使用的是静态主机,则可以为代理提供需要使用的分区号。

请检查下面的Kafka 0.8示例

制片人

KeyedMessage<String, String> data = new KeyedMessage<String, String>(<<topicName>>, <<KeyForPartition>>, <<Message>>);
Run Code Online (Sandbox Code Playgroud)

分区类

   public int partition(Object arg0, int arg1) {
        // arg0 is the key given while producing, arg1 is the number of
        // partition the broker has
        long organizationId = Long.parseLong((String) arg0);
        // if the given key is less than the no of partition available then send
        // it according to the key given Else send it to the last partition
        if (arg1 < organizationId) {

            return (arg1 - 1);
        }
        // return (int) (organizationId % arg1);
        return Integer.parseInt((String) arg0);
    }
Run Code Online (Sandbox Code Playgroud)

因此,partiotioner类根据您的逻辑决定将消息发送到哪里。

消费者(PN:我已使用Storm Kafka 0.8集成)

        HostPort hosts = new HostPort("10.**.**.***",9092);

        GlobalPartitionInformation gpi = new GlobalPartitionInformation();
        gpi.addPartition(0, hosts);
        gpi.addPartition(2, hosts);

        StaticHosts statHost = new StaticHosts(gpi);

        SpoutConfig spoutConf = new SpoutConfig(statHost, <<topicName>>, "/kafkastorm", <<spoutConfigId>>);
Run Code Online (Sandbox Code Playgroud)