Hao*_*ang 5 scala apache-kafka
我正在努力如何正确使用分区键机制.我的逻辑是将分区号设置为3,然后创建三个分区键为"0","1","2",然后使用分区键创建三个KeyedMessage,如
在此之后,创建一个生成器实例以发送所有KeyedMessage.
我希望每个KeyedMessage都应该根据不同的分区键进入不同的分区,这意味着
我正在使用Kafka-web-console来观察主题状态,但结果并不像我期望的那样.KeyedMessage仍然随机进入分区,有时候两个KeyedMessage将进入同一个分区,即使它们有不同的分区键.
为了使我的问题更清楚,我想发布一些我现有的Scala代码,并且我正在使用Kafka 0.8.2-beta和Scala 2.10.4.
这是生产者代码,我没有使用自定义partitioner.class:
val props = new Properties()
val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
props.put("compression.codec", codec.toString)
props.put("producer.type", if(synchronously) "sync" else "async")
props.put("metadata.broker.list", brokerList)
props.put("batch.num.messages", batchSize.toString)
props.put("message.send.max.retries", messageSendMaxRetries.toString)
props.put("request.required.acks",requestRequiredAcks.toString)
props.put("client.id",clientId.toString)
val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = {
if (partition == null) {
new KeyedMessage(topic,message)
} else {
new KeyedMessage(topic,partition,message)
}
}
def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8"))
def send(message: Array[Byte], partition: Array[Byte]): Unit = {
try {
producer.send(kafkaMesssage(message, partition))
} catch {
case e: Exception =>
e.printStackTrace
System.exit(1)
}
}
Run Code Online (Sandbox Code Playgroud)
以下是我如何使用生产者,创建生成器实例,然后使用此实例发送三条消息.目前我将分区键创建为Integer,然后将其转换为Byte Arrays:
val testMessage = UUID.randomUUID().toString
val testTopic = "sample1"
val groupId_1 = "testGroup"
print("starting sample broker testing")
val producer = new KafkaProducer(testTopic, "localhost:9092")
val numList = List(0,1,2);
for (a <- numList) {
// Create a partition key as Byte Array
var key = java.nio.ByteBuffer.allocate(4).putInt(a).array()
//Here I give a Array[Byte] key
//so the second "send" function of producer will be called
producer.send(testMessage.getBytes("UTF8"), key)
}
Run Code Online (Sandbox Code Playgroud)
不确定我的逻辑是不正确还是我没有正确理解分区键机制.任何人都可以提供一些示例代码或解释会很棒!
人们通常认为分区是一种在业务类别上分离业务数据的方法,但这不是查看分区的正确角度.
直接划分影响这些主题:
-performance(每个分区可以与其他分区并行使用)
- 消息顺序(仅在分区级别保证的消息顺序)
我将举例说明如何创建分区:
你有一个话题,比如MyMessagesToWorld
您想将此主题(所有MyMessagesToWorld)转移给某个消费者.
你"重量"MyMessagesToWorld的整个"质量"并发现,这是10公斤.
您在"MyMessagesToWorld"中有以下"业务"类别:
- 爸爸的消息(D)
- 给妈妈的消息(M)
- sis(S)的消息
- 给奶奶的消息(G)
- 老师的问候(T)
- 给女朋友的消息(F)
您认为,谁是您的消费者,并发现您的消费者是侏儒,每个小时消耗1 Kg消息.
你最多可以雇佣2个这样的侏儒.
1个侏儒需要10个小时消耗10公斤的消息,2个侏儒需要5个小时.
因此,您决定使用所有可用的gnome来节省时间.
要为这两个侏儒创建2个"通道",您可以在Kafka上创建此主题的2个分区.如果您调用更多gnomes,请创建更多分区.
您有6个业务类别和2个连续的独立消费者 - gnomes(消费者线程).
该怎么办?
卡夫卡的方法如下:
假设您在群集中有2个kafka实例.(同样的例子OK,如果你在集群中有更多的实例)
您在Kafka上将分区号设置为2,例如(以Kafka 0.8.2.1为例):
您在Kafka中定义主题,告诉您,该主题有2个分区:
kafka-topics.sh(.bat) --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic MyMessagesToWorld
Run Code Online (Sandbox Code Playgroud)
现在主题MyMessagesToWorld有2个分区:P(0)和P(1).
你选择了2号(分区),因为你知道,你有(侵略)只有2个消耗侏儒.
您可以在以后添加更多分区,届时将使用更多的消费者侏儒.
不要将卡夫卡消费者与这样的侏儒混淆.
卡夫卡消费者可以使用N个侏儒.(N个并行线程)
现在,您可以为消息创建KEY.
您需要KEYS在分区之间分发您的消息.
密钥将是您之前定义的"业务类别"的字母:D,M,S,G,T,F,您认为这些字母可以是ID.
但在一般情况下,无论什么可以用作键:(复杂的对象和字节数组,任何......)
如果您创建NO分区程序,将使用默认分区程序.
默认分区程序有点愚蠢.
它需要每个KEY的哈希码并将其除以可用分区的数量,"提醒"将定义该密钥的分区数.
例:
KEY M, hashcode=12345, partition for M = 12345 % 2 = 1
Run Code Online (Sandbox Code Playgroud)
可以想象,在最好的情况下使用这样的分区程序,您可以在每个分区中登陆3个业务类别.
在更糟糕的情况下,您可以将所有业务类别登陆到1个分区.
如果您有100000个业务类别,则在统计上可以通过此类算法分发它们.
但只有少数类别,您可能没有非常公平的分配.
因此,您可以重写分区程序并更智能地分发您的业务类别.
有一个例子:
此分区程序在可用分区之间平均分配业务类别.
public class CustomPartitioner {
private static Map<String, Integer> keyDistributionTable = new HashMap<String, Integer>();
private static AtomicInteger sequence = new AtomicInteger();
private ReentrantLock lock = new ReentrantLock();
public int partition(ProducerRecord<String, Object> record, Cluster cluster) {
String key = record.key();
int seq = figureSeq(key);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic());
if (availablePartitions.size() > 0) {
int part = seq % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
int numPartitions = partitions.size();
// no partitions are available, give a non-available partition
return seq % numPartitions;
}
}
private int figureSeq(String key) {
int sequentualNumber = 0;
if(keyDistributionTable.containsKey(key)){
sequentualNumber = keyDistributionTable.get(key);
}else{//synchronized region
//used only for new Keys, so high waiting time for monitor expected only on start
lock.lock();
try{
if(keyDistributionTable.containsKey(key)){
sequentualNumber = keyDistributionTable.get(key);
}else{
int seq = sequence.incrementAndGet();
keyDistributionTable.put(key, seq);
sequentualNumber = seq;
}
}finally{
lock.unlock();
}
}
return sequentualNumber;
}
Run Code Online (Sandbox Code Playgroud)
}
有同样的问题 - 只需切换到 ByteArrayParitioner:
props.put("partitioner.class", "kafka.producer.ByteArrayPartitioner")
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
6450 次 |
最近记录: |