我们将在几周内开始新的Spring 4应用程序.我们想使用一些事件驱动的架构.今年我在这里和那里读到关于"Reactor"的信息,在网上搜索时,我偶然发现了"Akka".
所以现在我们有3个选择:
ApplicationEvent:http://docs.spring.io/spring/docs/4.0.0.RELEASE/javadoc-api/org/springframework/context/ApplicationEvent.htmlReactor:https://github.com/reactor/reactor#reactorAkka:http://akka.io/我无法找到真正的比较.
现在我们只需要:
X 注册听 Event EY 注册听 Event EZ 发送一个 Event E然后X,Y将接收并处理该事件.
我们很可能会以异步方式使用它,但肯定会有一些同步方案.我们很可能总是将一个类作为事件发送.(Reactor样本主要使用字符串和字符串模式,但它也支持对象).
据我所知,ApplicationEvent默认情况下同步Reactor工作并以异步方式工作.并且Reactor还允许使用该await()方法使其有点同步.Akka提供或多或少相同Reactor,但也支持Remoting.
关于Reactor的await()方法:它可以等待多个线程完成吗?或者甚至可能是这些线程的一部分?如果我们从上面举例:
X 注册听 Event EY 注册听 Event EZ 发送一个 Event E可以通过说:等待X 并 Y完成来使其同步.它是否有可能让它等待X,但不是为了Y?
也许还有一些替代品?例如JMS呢?
很多问题,但希望你能提供一些答案!
谢谢!
spring multithreading event-driven-design akka project-reactor
我正在按照本指南学习如何使用spring-rabbitRabbitMQ.但是在本指南中,RabbitMQ配置是默认配置(localhost服务器,凭证为guest/guest).如果我想用ip地址和凭证连接到远程RabbitMQ,我该怎么办?我不知道在我的应用程序中将这些信息设置在何处.
rabbitmq spring-rabbit spring-amqp spring-boot spring-rabbitmq
我正在 Spring Boot 应用程序中使用 spring-kaka-2.2.0 编写集成测试,我几乎成功了,我的测试用例仍然返回 true,但在那之后我仍然看到多个错误。
2019-02-21 11:12:35.434 ERROR 5717 --- [ Thread-7] kafka.server.ReplicaManager : [ReplicaManager broker=0] Error while writing to highwatermark file in directory /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645
org.apache.kafka.common.errors.KafkaStorageException: Error while writing to checkpoint file /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint
Caused by: java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint.tmp (No such file or directory)
Run Code Online (Sandbox Code Playgroud)
测试配置
@EnableKafka
@TestConfiguration
public class KafkaProducerConfigTest {
@Bean
public EmbeddedKafkaBroker embeddedKafkaBroker() {
return new EmbeddedKafkaBroker(1,false,2,"test-events");
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); …Run Code Online (Sandbox Code Playgroud) java apache-kafka spring-boot spring-kafka spring-kafka-test
如何在不使用自动确认的情况下手动确认消息.是否有与一起使用的方式@RabbitListener和@EnableRabbit风格的配置.大多数文档告诉我们SimpleMessageListenerContainer一起使用ChannelAwareMessageListener.但是使用它会失去注释提供的灵活性.我已经配置了我的服务如下:
@Service
public class EventReceiver {
@Autowired
private MessageSender messageSender;
@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {
// code for processing order
}
Run Code Online (Sandbox Code Playgroud)
@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}
@Bean
public MappingJackson2MessageConverter jackson2Converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
return converter;
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setMaxConcurrentConsumers(5);
factory.setMessageConverter((MessageConverter) jackson2Converter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory; …Run Code Online (Sandbox Code Playgroud) 我更改了侦听 Kafka 主题的 Web 服务的消费者组 ID。现在,旧的组ID仍然注册到主题,但是没有具有该组ID的消费者。因此,它是滞后的。如何从特定主题中删除特定消费者组?
我试过这个:
kafka-consumer-groups --bootstrap-server kafka01.myserver.com:9092 --topic notification-topic --delete --group old-consumer-group --execute
但它返回:“消费者不支持从消费者组删除特定于主题的偏移量。 ”
我应该完全删除消费者组吗?我使用相同的组ID收听其他主题,它们会受到影响吗?
我是Spring AMQP/Rabbit MQ的新手.
我在我的项目中使用Spring AMQP/Rabbit MQ.运行tomcat后我遇到以下错误:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - 消费者引发异常,如果连接工厂支持,则处理可以重新启动.
异常摘要:org.springframework.amqp.AmqpConnectException:java.net.ConnectException:连接被拒绝:连接
以下是配置文件:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template connection-factory="connectionFactory" id="rabbitTemplate" channel-transacted="true"/>
<rabbit:queue name="proposalQueue" />
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="listener" queue-names="proposalQueue"/>
</rabbit:listener-container>
<bean id="rabbitMQTransactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<rabbit:direct-exchange name="myExchange">
<rabbit:bindings>
<rabbit:binding queue="proposalQueue" key="userMesssage" />
</rabbit:bindings>
</rabbit:direct-exchange>
<bean id="listener" class="com.xxx.xxxx.rabbitmq.QueueServer"/>
</beans>
Run Code Online (Sandbox Code Playgroud)
@Override
public void onMessage(Message message) {
Map<String, Object> result = new HashMap<>();
MessageProperties props = message.getMessageProperties();
BasicProperties replyProps = new BasicProperties.Builder().correlationId(new …Run Code Online (Sandbox Code Playgroud) 我看到有些人将 @EnableKafka 添加到他们的 Spring Boot 应用程序中,我想知道为什么。我有一个正在工作的 Spring Boot kafka 生产者和消费者,但我没有使用 @EnableKafka。那么,为什么人们需要明确添加它呢?
谢谢你。
我目前正在开发一个rabbit-amqp实现项目,并使用spring-rabbit以编程方式设置我的所有队列,绑定和交换.(spring-rabbit-1.3.4和spring-framework版本3.2.0)
在我看来,javaconfiguration类或基于xml的配置中的声明都是非常静态的.我知道如何为这样的队列,交换或绑定设置更动态的值(例如名称):
@Configuration
public class serverConfiguration {
private String queueName;
...
@Bean
public Queue buildQueue() {
Queue queue = new Queue(this.queueName, false, false, true, getQueueArguments());
buildRabbitAdmin().declareQueue(queue);
return queue;
}
...
}
Run Code Online (Sandbox Code Playgroud)
但我想知道是否有可能创建一个未定义的Queue数量实例并将它们注册为bean,就像注册其所有实例的工厂一样.
我并不熟悉Spring @Bean注释及其局限性,但我尝试过
@Configuration
public class serverConfiguration {
private String queueName;
...
@Bean
@Scope("prototype")
public Queue buildQueue() {
Queue queue = new Queue(this.queueName, false, false, true, getQueueArguments());
buildRabbitAdmin().declareQueue(queue);
return queue;
}
...
}
Run Code Online (Sandbox Code Playgroud)
并查看是否已注册Queue的多个Bean实例,我调用:
Map<String, Queue> queueBeans = ((ListableBeanFactory) applicationContext).getBeansOfType(Queue.class);
Run Code Online (Sandbox Code Playgroud)
但这只会返回1个映射:
name of the method := the last created …Run Code Online (Sandbox Code Playgroud) 在我的spring启动服务中尝试启动kafka使用者时看到NoSuchBeanDefinitionException并且无法启动服务本身.
下面是我的bean类,它包含为Kafka配置创建的所有必需bean
Spring Boot版本:1.5.2.RELEASE
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import com.ns.kafka.gateway.dtos.GatewayCallBackMessage;
@Configuration
@EnableKafka
public class GatewayCallbackToPNConsumerConfig {
@Bean
public Map < String, Object > consumerProps() {
Map < String, Object > props = new HashMap < > ();
props.put(null, null);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "gatewaycallbacktopngroup");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
return props;
}
@Bean
public Deserializer < String > …Run Code Online (Sandbox Code Playgroud) 是否可以根据某些条件重试?如果我用Retryable注释,它会根据一些异常重试,但我想在捕获该异常并且满足相应条件时重试。例子:
@Retryable(value={MyException.class},maxAttempts=2)
public myMethod(Request request){
try{
doSomething();
} Catch(Exception ex){
throw new MyException();
}
}
Run Code Online (Sandbox Code Playgroud)
在上面的请求中,我有一个标志isRetryRequired如果这是 true 并且 MyException 被捕获那么我想重试
apache-kafka ×4
java ×4
rabbitmq ×4
spring ×4
spring-amqp ×4
spring-boot ×3
spring-kafka ×3
akka ×1
consumer ×1
retry-logic ×1
spring-bean ×1
spring-retry ×1