我正在学习Reactor,并且想知道如何实现某种行为。假设我有一堆传入消息。每个消息都与某个实体相关联,并包含一些数据。
interface Message {
String getEntityId();
Data getData();
}
Run Code Online (Sandbox Code Playgroud)
与不同实体有关的消息可以并行处理。但是,与任何单个实体有关的消息必须一次处理一次,即,对于实体的消息2的处理要"abc"
等到针对实体的消息1的处理"abc"
完成后才能开始。在处理消息的过程中,应缓存该整段消息。其他实体的消息可以不受阻碍地进行。可以认为它是每个实体上运行这样的代码的线程:
public void run() {
for (;;) {
// Blocks until there's a message available
Message msg = messageQueue.nextMessageFor(this.entityId);
// Blocks until processing is finished
processMessage(msg);
}
}
Run Code Online (Sandbox Code Playgroud)
我该如何使用React做到这一点而又不会受到阻碍?总消息速率可能很高,但是每个实体的消息速率会非常低。实体集可能非常大,因此不一定事先知道。
我猜可能看起来像这样,但我不知道。
{
incomingMessages()
.groupBy(Message::getEntityId)
.flatMap(entityStream -> entityStream
/* ... */
.map(msg -> /* process the message */)))
/* ... */
}
public static Stream<Message> incomingMessages() { /* ... */ }
Run Code Online (Sandbox Code Playgroud) 当我应该使用持久的Actor时,我对Akka Persistence和持久性actor的适用性感到困惑?
例如,从给定购物应用程序的Cart模块中,每个用户的Cart Session将是一个具有各自唯一persistenceId的持久actor吗?
实际应用程序的可用性是什么?查询端如何处理持久化actor的状态?当持久化actor在实际应用程序中没用时?
存储状态或存储消息是一回事吗?是不是?当我应该使用每一个时,有什么区别?
有人可以给我一些例子吗?
但我无法理解event driven architectures
和之间的核心差异message driven architectures
.结果,我也无法理解为什么reactive manifesto
更喜欢Message Driven systems
而不是那Event Driven
个人的原因.
我还看了一下这次访谈,其中Martin Thompson
讨论了反应性宣言.
但是,我无法清楚地区分这两种架构,它们的优点和用例.
我正在阅读这篇文章,称为事件驱动架构中的变体,它们演示了中介和代理拓扑.
根据文章,中介拓扑看起来有点像这样:
事件流开始于客户端将事件发送到事件队列,该事件队列用于将事件传输到介体.该事件介体接收初始事件,并通过发送附加异步事件编排该事件的事件通道以执行该过程的每个步骤.监听事件通道的事件处理器从偶数调解器接收事件并执行特定的业务逻辑来处理事件[...]重要的是要注意事件调解器实际上并不执行必要的业务逻辑处理初始事件,而不是它知道处理事件所需的步骤[...]事件通道可以是消息队列o消息主题.
所以,我正在研究这个图,试图理解中介如何确定给定处理器何时完成处理给定事件,以便它可以协调下一步的过程.
它说的文章不够清楚
对于每个初始事件步骤,事件中介器创建处理事件,发送该处理事件并等待相应事件处理器处理处理事件.此过程将继续,直到处理完初始事件中的所有步骤.
现在,文章清楚地表明通信是异步的,并且事件消息将通过消息队列传播,但该图不显示从事件处理器发出并返回到调解器的任何事件.
文章说调解员等待事件处理器完成,但目前尚不清楚这应该如何在架构方面发生.
它是异步的,基于队列的RPC(例如Rabbit RPC),还是有另一个侦听器在某处等待异步响应?
有关如何从架构的角度实现这一点的任何想法?
events event-driven-design event-driven orchestration microservices
让我们假设一个简化的场景是这样的:
Kafka有两个主题,用户和订单,三个微服务 user-service,order-service和shipping-service。
通过订单服务下订单时,会将OrderCreated事件添加到订单主题,并由货运服务监听。该服务需要获取用户信息才能发送订单。根据我的要求,我无法对用户服务进行REST调用,而是使用有状态方法。也就是说,运输服务是一个Kafka Streams应用程序,该应用程序侦听用户主题,并具有由本地商店支持的KTable以及完整的用户表信息。因此,在处理订单时,它已经具有本地可用的用户信息。
但是,此方法的一个问题是运输服务中本地用户信息的一致性,例如:
用户在用户服务中更新其收货地址,它更新其本地SQL数据库,并通过此更改在用户主题中发布事件。
用户下订单,因此订单服务将其发布在订单主题中。
无论出于何种原因,运输服务都可以在从用户主题读取UserUpdated信息之前处理订单主题中的OrderCreated事件,因此它将使用不再有效的地址。
在这种事件承载状态转移方案中,我如何保证运输服务始终具有更新的用户信息?
event-driven-design event-driven apache-kafka apache-kafka-streams
"基于事件"是否与"异步"相同?
什么是事件驱动编程,事件驱动编程与线程有什么关系?我来到这个问题阅读有关服务器以及它们如何处理用户请求和管理数据的问题.如果用户发送请求,服务器将开始处理数据并将状态写入表中.为什么会这样?服务器是否停止为该用户处理数据并开始为另一个用户处理数据,或者为每个用户处理是否在另一个线程(多线程服务器)中运行?
我正在尝试学习一些事件驱动编程的基础知识.因此,对于练习,我正在尝试编写一个程序,该程序读取大型二进制文件并使用它执行某些操作但不进行阻塞调用.我想出了以下内容:
var fs = require('fs');
var BUFFER_SIZE = 1024;
var path_of_file = "somefile"
fs.open(path_of_file, 'r', (error_opening_file, fd) =>
{
if (error_opening_file)
{
console.log(error_opening_file.message);
return;
}
var buffer = new Buffer(BUFFER_SIZE);
fs.read(fd, buffer, 0, BUFFER_SIZE, 0, (error_reading_file, bytesRead, buffer) =>
{
if (error_reading_file)
{
console.log(error_reading_file.message);
return;
}
// do something e.g. print or write to another file
})
})
Run Code Online (Sandbox Code Playgroud)
我知道我需要设置一个while循环以便读取完整的文件,但在上面的代码中我只读取文件的前1024个字节,并且不能制定如何在不使用阻塞循环的情况下继续读取文件.我们怎么能这样做?
我想知道socket.io方法如何用于发出某个事件,我已经读过它不像长轮询方法,而是一个可以在所有不同浏览器上工作的不同方法...客户端如何保持 - 没有长轮询请求与服务器联系?
我是node.js的新手,我想为eventdriven服务器实现我自己的系统(是的......重新发明轮子!)因为我想用手触摸这个如此简单的"socket.io - emit ()" 方法.
谢谢你的帮助!
我过去曾听过不同人的术语Data Driven
和 Event Driven
模型。我曾经用过google,但是这些术语对我来说仍然很模糊,因为它们看起来都和我相似
数据驱动编程是一种编程模型,其中数据本身控制程序的流程(而不是程序逻辑),在事件驱动编程的情况下,它是事件而不是数据本身控制程序的流程。
每个地雷的理解事件也是数据。例如,在基于员工的Web应用程序中-如果用户单击“创建员工”按钮,则这里的事件是创建员工(也仅是一种数据),而数据是与员工相关的信息。
现在,首先在服务器上将是事件,它将决定程序的流程,然后数据(与员工相关的信息)也将控制执行流程,例如是否将执行永久雇员不同的方法以及是否将临时雇员不同的方法
那么,不是每件事都是数据驱动的体系结构吗?如果没有,它们之间有什么区别?任何基于网络的示例都将有所帮助
event-driven ×10
node.js ×2
akka ×1
apache-kafka ×1
asynchronous ×1
cqrs ×1
data-driven ×1
distributed ×1
events ×1
java ×1
paradigms ×1
process ×1
reactor ×1
socket.io ×1
synchronous ×1
typesafe ×1