小编Gar*_*ell的帖子

在Spring中使用什么样的"EventBus"?内置,Reactor,Akka?

我们将在几周内开始新的Spring 4应用程序.我们想使用一些事件驱动的架构.今年我在这里和那里读到关于"Reactor"的信息,在网上搜索时,我偶然发现了"Akka".

所以现在我们有3个选择:

我无法找到真正的比较.


现在我们只需要:

  • X 注册听 Event E
  • Y 注册听 Event E
  • Z 发送一个 Event E

然后X,Y将接收并处理该事件.

我们很可能会以异步方式使用它,但肯定会有一些同步方案.我们很可能总是将一个类作为事件发送.(Reactor样本主要使用字符串和字符串模式,但它也支持对象).


据我所知,ApplicationEvent默认情况下同步Reactor工作并以异步方式工作.并且Reactor还允许使用该await()方法使其有点同步.Akka提供或多或少相同Reactor,但也支持Remoting.

关于Reactor的await()方法:它可以等待多个线程完成吗?或者甚至可能是这些线程的一部分?如果我们从上面举例:

  • X 注册听 Event E
  • Y 注册听 Event E
  • Z 发送一个 Event E

可以通过说:等待X Y完成来使其同步.它是否有可能让它等待X,但不是为了Y


也许还有一些替代品?例如JMS呢?

很多问题,但希望你能提供一些答案!

谢谢!


编辑:示例用例 …

spring multithreading event-driven-design akka project-reactor

40
推荐指数
2
解决办法
2万
查看次数

如何用spring-rabbit配置RabbitMQ连接?

我正在按照本指南学习如何使用spring-rabbitRabbitMQ.但是在本指南中,RabbitMQ配置是默认配置(localhost服务器,凭证为guest/guest).如果我想用ip地址和凭证连接到远程RabbitMQ,我该怎么办?我不知道在我的应用程序中将这些信息设置在何处.

rabbitmq spring-rabbit spring-amqp spring-boot spring-rabbitmq

19
推荐指数
1
解决办法
2万
查看次数

Spring Kafka集成测试写入高水印文件时出错

我正在 Spring Boot 应用程序中使用 spring-kaka-2.2.0 编写集成测试,我几乎成功了,我的测试用例仍然返回 true,但在那之后我仍然看到多个错误。

2019-02-21 11:12:35.434 ERROR 5717 --- [       Thread-7] kafka.server.ReplicaManager              : [ReplicaManager broker=0] Error while writing to highwatermark file in directory /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645

org.apache.kafka.common.errors.KafkaStorageException: Error while writing to checkpoint file /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint
Caused by: java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint.tmp (No such file or directory)
Run Code Online (Sandbox Code Playgroud)

测试配置

@EnableKafka
@TestConfiguration
public class KafkaProducerConfigTest {

@Bean
public EmbeddedKafkaBroker embeddedKafkaBroker() {
    return new EmbeddedKafkaBroker(1,false,2,"test-events");
}


@Bean
public ProducerFactory<String, Object> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka spring-kafka-test

16
推荐指数
2
解决办法
4408
查看次数

Spring RabbitMQ - 在具有@RabbitListener配置的服务上使用手动通道确认

如何在不使用自动确认的情况下手动确认消息.是否有与一起使用的方式@RabbitListener@EnableRabbit风格的配置.大多数文档告诉我们SimpleMessageListenerContainer一起使用ChannelAwareMessageListener.但是使用它会失去注释提供的灵活性.我已经配置了我的服务如下:

@Service
public class EventReceiver {

@Autowired
private MessageSender messageSender;

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {

  // code for processing order
}
Run Code Online (Sandbox Code Playgroud)

我的RabbitConfiguration如下

@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {

public static void main(String[] args) {
    SpringApplication.run(RabbitApplication.class, args);
}

@Bean


public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new  MappingJackson2MessageConverter();
        return converter;
    @Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(rabbitConnectionFactory());
      factory.setMaxConcurrentConsumers(5);
      factory.setMessageConverter((MessageConverter) jackson2Converter());
      factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      return factory; …
Run Code Online (Sandbox Code Playgroud)

java rabbitmq spring-rabbit spring-amqp spring-rabbitmq

14
推荐指数
2
解决办法
2万
查看次数

如何从特定主题中删除 Kafka 消费者组?

我更改了侦听 Kafka 主题的 Web 服务的消费者组 ID。现在,旧的组ID仍然注册到主题,但是没有具有该组ID的消费者。因此,它是滞后的。如何从特定主题中删除特定消费者组?

我试过这个:

kafka-consumer-groups --bootstrap-server kafka01.myserver.com:9092 --topic notification-topic --delete --group old-consumer-group --execute

但它返回:“消费者不支持从消费者组删除特定于主题的偏移量。

我应该完全删除消费者组吗?我使用相同的组ID收听其他主题,它们会受到影响吗?

consumer apache-kafka kafka-consumer-api

14
推荐指数
1
解决办法
2万
查看次数

无法连接Spring AMQP/Rabbit MQ:org.springframework.amqp.AmqpConnectException:java.net.ConnectException:连接被拒绝:连接

我是Spring AMQP/Rabbit MQ的新手.

我在我的项目中使用Spring AMQP/Rabbit MQ.运行tomcat后我遇到以下错误:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - 消费者引发异常,如果连接工厂支持,则处理可以重新启动.

异常摘要:org.springframework.amqp.AmqpConnectException:java.net.ConnectException:连接被拒绝:连接

以下是配置文件:

弹簧amqp.xml

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit        http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <rabbit:connection-factory id="connectionFactory" host="127.0.0.1"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <rabbit:template connection-factory="connectionFactory" id="rabbitTemplate" channel-transacted="true"/>
    <rabbit:queue name="proposalQueue" />

    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener ref="listener" queue-names="proposalQueue"/>
    </rabbit:listener-container>

    <bean id="rabbitMQTransactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>

    <rabbit:direct-exchange name="myExchange">
        <rabbit:bindings>
             <rabbit:binding queue="proposalQueue" key="userMesssage" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <bean id="listener" class="com.xxx.xxxx.rabbitmq.QueueServer"/>
 </beans>
Run Code Online (Sandbox Code Playgroud)

QueueServer.java

@Override
    public void onMessage(Message message) {

    Map<String, Object> result = new HashMap<>();       
    MessageProperties props = message.getMessageProperties();
    BasicProperties replyProps = new BasicProperties.Builder().correlationId(new …
Run Code Online (Sandbox Code Playgroud)

rabbitmq spring-amqp spring-rabbitmq

13
推荐指数
2
解决办法
3万
查看次数

在 Spring Kafka 中,我是否需要将 @EnableKafka 注释添加到我的应用程序中?

我看到有些人将 @EnableKafka 添加到他们的 Spring Boot 应用程序中,我想知道为什么。我有一个正在工作的 Spring Boot kafka 生产者和消费者,但我没有使用 @EnableKafka。那么,为什么人们需要明确添加它呢?

谢谢你。

java apache-kafka spring-boot spring-kafka

13
推荐指数
2
解决办法
3048
查看次数

以bean的形式动态添加新队列,绑定和交换

我目前正在开发一个rabbit-amqp实现项目,并使用spring-rabbit以编程方式设置我的所有队列,绑定和交换.(spring-rabbit-1.3.4和spring-framework版本3.2.0)

在我看来,javaconfiguration类或基于xml的配置中的声明都是非常静态的.我知道如何为这样的队列,交换或绑定设置更动态的值(例如名称):

@Configuration
public class serverConfiguration {
   private String queueName;
   ...
   @Bean
   public Queue buildQueue() {
    Queue queue = new Queue(this.queueName, false, false, true, getQueueArguments());
    buildRabbitAdmin().declareQueue(queue);
    return queue;
   }
   ...
}
Run Code Online (Sandbox Code Playgroud)

但我想知道是否有可能创建一个未定义的Queue数量实例并将它们注册为bean,就像注册其所有实例的工厂一样.

我并不熟悉Spring @Bean注释及其局限性,但我尝试过

@Configuration
public class serverConfiguration {
   private String queueName;
   ...
   @Bean
   @Scope("prototype")
   public Queue buildQueue() {
    Queue queue = new Queue(this.queueName, false, false, true, getQueueArguments());
    buildRabbitAdmin().declareQueue(queue);
    return queue;
   }
   ...
}
Run Code Online (Sandbox Code Playgroud)

并查看是否已注册Queue的多个Bean实例,我调用:

Map<String, Queue> queueBeans = ((ListableBeanFactory) applicationContext).getBeansOfType(Queue.class);
Run Code Online (Sandbox Code Playgroud)

但这只会返回1个映射:

name of the method := the last created …
Run Code Online (Sandbox Code Playgroud)

spring rabbitmq spring-rabbit spring-amqp spring-bean

11
推荐指数
1
解决办法
5517
查看次数

Spring Boot Kafka:由于NoSuchBeanDefinitionException,无法启动使用者

在我的spring启动服务中尝试启动kafka使用者时看到NoSuchBeanDefinitionException并且无法启动服务本身.

下面是我的bean类,它包含为Kafka配置创建的所有必需bean

Spring Boot版本:1.5.2.RELEASE

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import com.ns.kafka.gateway.dtos.GatewayCallBackMessage;

@Configuration
@EnableKafka
public class GatewayCallbackToPNConsumerConfig {
 @Bean
 public Map < String, Object > consumerProps() {
  Map < String, Object > props = new HashMap < > ();
  props.put(null, null);
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "gatewaycallbacktopngroup");
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
  return props;
 }

 @Bean
 public Deserializer < String > …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka kafka-consumer-api spring-kafka

9
推荐指数
1
解决办法
2万
查看次数

Spring@Retryable用于特定条件

是否可以根据某些条件重试?如果我用Retryable注释,它会根据一些异常重试,但我想在捕获该异常并且满足相应条件时重试。例子:

@Retryable(value={MyException.class},maxAttempts=2)
public myMethod(Request request){

    try{
        doSomething();
    } Catch(Exception ex){
        throw new MyException();
    }

}
Run Code Online (Sandbox Code Playgroud)

在上面的请求中,我有一个标志isRetryRequired如果这是 true 并且 MyException 被捕获那么我想重试

java spring spring-retry retry-logic

9
推荐指数
1
解决办法
5070
查看次数