我看到并使用默认分区器类实现了KafkaTemplate.send(TOPIC,message)方法。
但在这里,我不传递钥匙。我有一个简单的自定义分区器类,我还想发送到KafkaTemplate(TOPIC,key,message)这样的 kafka 服务器,其中在 ProducerConfig 中我设置了用于分区的 customPartitioner 类。
我看到如果我提供自定义分区器,KafkaTemplate 的 Will send(Topic, Key, Message) 方法会调用 Partition 方法吗?但我没有完全明白。
public class CustomPartitionar implements Partitioner {
private PartitionMapper newMapper;
public CustomPartitionar(){
newMapper = new PartitionMapper();
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,Cluster cluster) {
int partition = 0;
String userName = (String) key;
// Find the id of current user based on the username from another mapper class
Integer userId = newMapper.findUserId(userName);
// If the userId not found, default partition is 0
if (userId != null) {
partition = userId;
}
return partition;
}
@Override
public void close() {
}
}
Run Code Online (Sandbox Code Playgroud)
config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitionar.class);
Run Code Online (Sandbox Code Playgroud)
KafkaTemplate.send(message.getTopic(),message.getReceiver(),message)
Run Code Online (Sandbox Code Playgroud)
这可以通过简单的方式实现吗?或者我错过了什么?
有以下KafkaTemplate几种send方法:
/**
* Send the data to the default topic with no key or partition.
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> sendDefault(V data);
/**
* Send the data to the default topic with the provided key and no partition.
* @param key the key.
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
/**
* Send the data to the default topic with the provided key and partition.
* @param partition the partition.
* @param key the key.
* @param data the data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
/**
* Send the data to the default topic with the provided key and partition.
* @param partition the partition.
* @param timestamp the timestamp of the record.
* @param key the key.
* @param data the data.
* @return a Future for the {@link SendResult}.
* @since 1.3
*/
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
/**
* Send the data to the provided topic with no key or partition.
* @param topic the topic.
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(String topic, V data);
/**
* Send the data to the provided topic with the provided key and no partition.
* @param topic the topic.
* @param key the key.
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
/**
* Send the data to the provided topic with the provided key and partition.
* @param topic the topic.
* @param partition the partition.
* @param key the key.
* @param data the data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
/**
* Send the data to the provided topic with the provided key and partition.
* @param topic the topic.
* @param partition the partition.
* @param timestamp the timestamp of the record.
* @param key the key.
* @param data the data.
* @return a Future for the {@link SendResult}.
* @since 1.3
*/
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
/**
* Send the provided {@link ProducerRecord}.
* @param record the record.
* @return a Future for the {@link SendResult}.
* @since 1.3
*/
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
/**
* Send a message with routing information in message headers. The message payload
* may be converted before sending.
* @param message the message to send.
* @return a Future for the {@link SendResult}.
* @see org.springframework.kafka.support.KafkaHeaders#TOPIC
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID
* @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY
*/
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
11558 次 |
| 最近记录: |