根据配置向不同的Kafka主题发送消息

Pet*_*zov 4 java apache-kafka

我想根据配置将数据发送到不同的 Kafka 消息:

ResponseFactory processingPeply = null;

        switch(endpointType)
        {
            case "email":
                ProducerRecord<String, Object> record = new ProducerRecord<>("tp-email.request", tf);
                RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionEmailReplyKafkaTemplate.sendAndReceive(record);
                SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

                processingPeply = (ResponseFactory) consumerRecord.value();
              break;
            case "sms":
                ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sms.request", tf);
                RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionSmsReplyKafkaTemplate.sendAndReceive(record);
                SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

                processingPeply = (ResponseFactory) consumerRecord.value();
              break;
            case "network":
                ProducerRecord<String, Object> record = new ProducerRecord<>("tp-network.request", tf);
                RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionNetworkReplyKafkaTemplate.sendAndReceive(record);
                SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

                processingPeply = (ResponseFactory) consumerRecord.value();
              break;
              
            default:
                processingPeply = ResponseFactory.builder().status("error").build();
        } 
Run Code Online (Sandbox Code Playgroud)

我目前得到:

  • 变量“记录”已在范围内定义
  • 变量“sendResult”已在范围内定义
  • 变量 'consumerRecord' 已在作用域中定义

您知道我如何以更好的方式重新设计代码以便解决问题吗?

ara*_*ran 7

在这里建议 4 种可能的方法,以避免核心代码中的一些开关块并遵守DRY的原则之一,即避免重复代码。(DRY 代表了一个比不重复代码更大的概念)。


1- GeneralHandler 和端点类型孩子

这里有点像分层类的树,不同的端点是抽象/一般父亲的扩展。

                      [GeneralKafkaHandler] - core/common logic
               _______________ | ________________
              |                |                |
              v                v                v
         {SmsHandler}    {EmailHandler}   {NetworkHandler}  -- specific params/methods
Run Code Online (Sandbox Code Playgroud)

例如,getTopic()getFuture()可以abstract在父亲身上,由每个孩子用自己的逻辑实现。另一种选择是制作getKafkaTemplate()另一种抽象方法(getFuture()或之间进行选择getKafkaTemplate()。这是层次结构的简化,从构造函数中检索主题。

Abstract father

abstract class GeneralKafkaHandler 
{
   public abstract RequestReplyFuture<String, Object, Object> 
                   getFuture(ProducerRecord<>r);
   public abstract String getName();

   protected String topic;
   protected int id;
   ResponseFactory processingPeply = null;

   public GeneralKafkaHandler(String topic, int id) 
   {
       this.topic = topic; 
       this.id = id;
   }

   public void handle(Object tf) //the main/common logic is implemented here
   {
       ProducerRecord<String, Object> record = new ProducerRecord<>(topic, tf);
       RequestReplyFuture<String, Object, Object> rf = getFuture(record);  
       SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
       ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
       processingPeply = (ResponseFactory) consumerRecord.value();
   }

   //...
}
Run Code Online (Sandbox Code Playgroud)

SmsHandler

class SmsKafkaHandler extends GeneralKafkaHandler 
{
   //Sms specific variables, methods,..
    
   public SmsKafkaHandler(String topic, int id) 
   {
      super(topic, id);
      //sms code
   }

   @Override
   public String getName() 
   {
      return "SMSHandler_"+topic+"_"+id);
   }

   @Override
   public RequestReplyFuture<String, Object, Object> getFuture(ProducerRecord<> r)
   {
      //sms code
      return processingTransactionSmsReplyKafkaTemplate.sendAndReceive(r);
   }

   //...
}
Run Code Online (Sandbox Code Playgroud)

Main只是一个例子

Map<String, GeneralKafkaHandler> handlerMap = new HashMap<>();
handlerMap.put("sms", new SmsKafkaHandler("tp-sms.request",1));
handlerMap.put("smsplus", new SmsKafkaHandler("tp-sms-plus.request",2));
handlerMap.put("email", new EmailKafkaHandler("tp-email.request",1));
//...

handlerMap.get(endpointType.toLowerCase()).handle(tf);
Run Code Online (Sandbox Code Playgroud)

这里有不同的选择;例如,sendAndReceive 也是所有类型的通用方法,因此getFuture()可以仅通过一个getTemplate()方法来更改。这里有很多选择。

如果您需要/希望更多地管理每个端点,这种方法将是一个好主意;如果您认为不同的管理是值得的,或者将来会值得,您可以考虑;由于核心机制是相同的,不同的扩展可以让你快速实现不同的端点类型。


2- 自定义实体

本质上,端点类型只有 2 个不同的元素:

  1. Topic
  2. ReplyingKafkaTemplate

您可以将它们包装成一个对象。例如:

public class TopicEntity
{
  public final String topic;
  public final ReplyingKafkaTemplate<String,Object,Object> template;

  public TopicEntity(String topic, ReplyingKafkaTemplate<String,Object,Object> template)
  {
     this.topic = topic;
     this.template = template;
  }    
}
Run Code Online (Sandbox Code Playgroud)

那么你可以在不修改当前代码的情况下获得这个(这里我假设你的模板已经初始化):

TopicEntity smsE = new TopicEntity("tp-sms.request",
                                   processingTransactionSmsReplyKafkaTemplate);
TopicEntity mailE = new TopicEntity("tp-email.request",
                                   processingTransactionEmailReplyKafkaTemplate);

Map<String, TopicEntity> handlerMap = new HashMap<>();
handlerMap.put("sms", smsE);
handlerMap.put("email",mailE);
//...

TopicEntity te = handlerMap.get(endpointType.toLowerCase()); 
//Based on endpoint
ProducerRecord<String, Object> record = new ProducerRecord<>(te.topic, tf);
RequestReplyFuture<String, Object, Object> rf = te.template.sendAndReceive(record);
//Common regardless of endpoint
SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();
Run Code Online (Sandbox Code Playgroud)

非常简单,也避免了重复代码;该实体还允许您为每个端点定义特定的特征。


3- 吸气剂方法

更简单的方法,只是为了让主代码看起来更干净。

ProducerRecord<String, Object> record = new ProducerRecord<>(getTopic(endpointType),tf);
RequestReplyFuture<String, Object, Object> replyFuture = getFuture(endpointType,record);
/*rest of the code here (common regardless type)*/
Run Code Online (Sandbox Code Playgroud)

和吸气剂:

String getTopic(String e)
{
   switch(e.toLowerCase())
   {
      case "email"  : return "tp-email.request"; 
      case "sms"    : return "tp-sms.request";
      case "network": return "tp-network.request";
      default : /*handle error*/ return null; 
                /*kafka's response - "topic cannot be null");*/
    }
}

RequestReplyFuture<String, Object, Object> getFuture(String e, ProducerRecord<> r)
{
  switch(e.toLowerCase())
  {
     case "email": 
          return processingTransactionEmailReplyKafkaTemplate.sendAndReceive(r);
     case "sms" :
           return processingTransactionSmsReplyKafkaTemplate.sendAndReceive(r);
     case "network": 
           return processingTransactionNetworkReplyKafkaTemplate.sendAndReceive(r);
     default : /*handle error*/ return null;
  }            /*this one should never be executed*/
}
Run Code Online (Sandbox Code Playgroud)

4- 单人二传手

好吧,也许这是一种更简单的方法……这将是方法 3 和方法 4 之间的斗争。

ReplyingKafkaTemplate template;
String topic;
//...

void setParameters(String e)
{
  switch(e.toLowerCase())
  {
    case "email"  : 
          topic = "tp-email.request"; 
          template = processingTransactionEmailReplyKafkaTemplate;
          break;         
    case "sms"    :       
          topic = "tp-sms.request"; 
          template = processingTransactionSmsReplyKafkaTemplate;
          break;         
     //...
   }
}
//...

setParameters(endpointType);

ProducerRecord<String, Object> r= new ProducerRecord<>(topic,tf);
RequestReplyFuture<String, Object, Object> replyFuture = template.sendAndReceive(r);
SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();
Run Code Online (Sandbox Code Playgroud)

1.a)- Spring 和 GeneralHandler

剧透:我不知道 sh#!关于 Spring,所以这可能完全不正确。

从我在这里读到的,抽象类不需要任何注释,只需要孩子们可能访问的字段@Autowired

abstract class GeneralKafkaHandler 
{
   public abstract RequestReplyFuture<String, Object, Object> 
                   getFuture(ProducerRecord<>r);
   public abstract String getName();

   @Autowired
   protected String topic;
   @Autowired
   protected int id;

   ResponseFactory processingPeply = null;

   public GeneralKafkaHandler(String topic, int id) 
   {
       this.topic = topic; 
       this.id = id;
   }

   public void handle(Object tf) //the main/common logic is implemented here
   {
       ProducerRecord<String, Object> record = new ProducerRecord<>(topic, tf);
       RequestReplyFuture<String, Object, Object> rf = getFuture(record);  
       SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
       ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
       processingPeply = (ResponseFactory) consumerRecord.value();
   }

   //...
}
Run Code Online (Sandbox Code Playgroud)

并且孩子们应该有@Component注释,以及@Autowired在构造函数中;我不太确定最后一个,因为我看到的示例还包括也在 child.h 中定义的字段。

@Component
class SmsKafkaHandler extends GeneralKafkaHandler 
{
   //Sms specific variables, methods,..
    
   @Autowired  //not sure about this..
   public SmsKafkaHandler(String topic, int id) 
   {
      super(topic, id);
      //sms code
   }

   @Override
   public String getName() 
   {
      return "SMSHandler_"+topic+"_"+id);
   }

   @Override
   public RequestReplyFuture<String, Object, Object> getFuture(ProducerRecord<> r)
   {
      //sms code
      return processingTransactionSmsReplyKafkaTemplate.sendAndReceive(r);
   }

   //...
}
Run Code Online (Sandbox Code Playgroud)

真的,我不知道我在说什么关于这个 Spring 解决方案,我什至不知道那些注释是什么,看电脑的狗的模因代表了此时的我。所以请慎重对待...


DRY 是给失败者的