具有作业亲和力的作业队列

Gol*_*den 9 distributed design-patterns message-queue dispatcher job-queue

我目前面临一个问题,我可以肯定有一个正式名称,但是我不知道该在网上搜索什么。我希望如果我描述问题和解决方案时想到的,有人能够告诉我设计模式的名称(如果有一个与我要描述的内容相匹配的设计模式)。

基本上,我想拥有一个工作队列:我有多个创建工作的客户端(发布者),以及许多处理这些工作的工人(消费者)。现在,我想将发布者创建的作业分发给各个使用者,这基本上可以使用几乎任何消息队列并在整个队列中实现负载均衡,例如使用RabbitMQ甚至MQTT 5。

但是,现在事情变得复杂了……每个工作都指向一个外部实体,例如一个用户。我想要的是按顺序处理单个用户的作业,但并行处理多个用户的作业。我不要求用户X的作业始终去工作者Y,因为无论如何它们都应按顺序处理。

现在,我可以使用RabbitMQ及其一致的哈希交换来解决此问题,但是当新的工作人员进入集群时,我就会进行数据竞赛,因为RabbitMQ不支持重新定位已经在队列中的作业。

MQTT 5也不支持:在这里,这个想法被称为“粘性共享订阅”,但这不是官方的。它可能是MQTT 6的一部分,也可能不是。谁知道。

我也看过NSQ,NATS和其他一些经纪人。他们中的大多数甚至都不支持这种非常特定的情况,而那些确实使用一致哈希的情况,则存在前面提到的数据竞速问题。

现在,如果代理在作业到达后不将作业分类到队列中,而是跟踪某个特定用户的作业是否已经在处理中,则问题将消失:如果这样,它将延迟所有其他作业该用户,但其他用户的所有作业仍应处理。使用RabbitMQ等人无法做到这一点。

我很确定我不是唯一拥有用例的人。例如,我可以想到用户将视频上传到视频平台,尽管上传的视频是并行处理的,但单个用户上传的所有视频都是按顺序处理的。

因此,简而言之:我所形容的名字是用一个普通名字吗?诸如分布式作业队列之类的东西?具有任务相似性的任务调度程序?还是其他?我尝试了很多术语,但没有成功。这可能意味着没有解决方案,但是如上所述,很难想象我是这个问题上唯一的人。

有什么想法我可以寻找吗?并且:是否有实现此目的的工具?有协议吗?

PS:仅使用预定义的路由密钥是不可行的,因为用户ID(我在这里只是作为一个示例)基本上是UUID,因此可以有数十亿个,因此我需要更多动态的东西。因此,一致性哈希基本上是正确的方法,但是如上所述,为了避免数据争用,分发必须逐个进行而不是预先进行。

Max*_*eev 19

临时工作流能够以最少的努力支持您的用例。

这是一个满足您要求的稻草人设计:

  • 使用 userID 作为工作流 ID 向用户工作流发送 signalWithStart 请求。它要么将信号传递给工作流,要么首先启动工作流并将信号传递给它。
  • 对该工作流的所有请求都由它缓冲。Temporal 提供了一种硬性保证,即只有一个具有给定 ID 的工作流可以处于打开状态。所以所有的信号(事件)都保证在属于用户的工作流中被缓冲。在出现任何流程或基础设施故障时,Temporal 会保留工作流中的所有数据(包括堆栈跟踪和局部变量)。所以不需要taskQueue显式地持久化变量。
  • 内部工作流事件循环将这些请求一一分派。
  • 当缓冲区为空时,工作流可以完成。

这是在 Java 中实现它的工作流代码(也支持 Go 和 PHP SDK,NodeJS 处于 alpha 阶段):

@WorkflowInterface
public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

@ActivityInterface
public interface TaskProcessorActivity {
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            processor.process(taskQueue.poll());
        }
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}
Run Code Online (Sandbox Code Playgroud)

然后是通过信号方法将该任务排入工作流的代码:

private void addTask(WorkflowClient cadenceClient, Task task) {
    // Set workflowId to userId
    WorkflowOptions options = WorkflowOptions.newBuilder()
       .setTaskQueue(TASK_QUEUE)
       .setWorkflowId(task.getUserId())
       .build();
    // Use workflow interface stub to start/signal workflow instance
    SerializedExecutionWorkflow workflow = temporalClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
    BatchRequest request = temporalClient.newSignalWithStartRequest();
    request.add(workflow::execute);
    request.add(workflow::addTask, task);
    temporalClient.signalWithStart(request);
}
Run Code Online (Sandbox Code Playgroud)

与使用队列进行任务处理相比,Temporal 提供了许多其他优势。

  • 以无限的到期间隔构建指数重试
  • 故障处理。例如,如果在配置的时间间隔内两个更新都无法成功,它允许执行通知另一个服务的任务。
  • 支持长时间运行的心跳操作
  • 能够实现复杂的任务依赖关系。例如,在不可恢复的故障(SAGA)的情况下实现调用链或补偿逻辑
  • 提供对更新当前状态的完整可见性。例如,当使用队列时,您知道队列中是否有一些消息,并且您需要额外的数据库来跟踪整体进度。使用 Temporal 记录每个事件。
  • 能够取消正在进行的更新。
  • 分布式 CRON 支持

请参阅介绍Temporal 编程模型的演示文稿。它提到了 Temporal 的前身 Cadence 项目。


ami*_*che 3

我想要的是一个作业队列:我有多个创建作业的客户端(发布者),以及一些处理这些作业的工作人员(消费者)。现在我想将发布者创建的作业分发给各个消费者,这基本上可以使用几乎任何具有跨队列负载平衡的消息队列来实现,例如使用 RabbitMQ 甚至 MQTT 5。

然而,现在事情变得复杂了......每个作业都引用一个外部实体,比如说用户。我想要的是单个用户的作业按顺序处理,但多个用户的作业则并行处理。我不要求用户 X 的作业始终交给工人 Y,因为无论如何它们都应该按顺序处理。

即使不是这个特定的用例,我在几个月前对(动态)任务调度 [ 0 ][ 1 ] 进行了调查,但没有出现类似的情况。

我读到的每个调度算法都具有一些所有其他任务共有的属性,例如优先级、年龄、排队时间、任务名称(以及扩展的平均处理时间)。如果您的任务全部链接到一个用户,您可以构建一个调度程序,考虑user_id从队列中选择任务。

但我想,您不想构建自己的调度程序,无论如何这都是浪费,因为根据此类需求的经验,现有的消息队列允许实现您的要求。

总结您的要求,您需要:

每个用户同时仅运行一个任务的调度程序。

解决方案是使用分布式锁,例如REDIS distlock,在任务启动之前获取锁,并在任务执行期间定期刷新它。如果同一用户的新任务进入并尝试执行,它将无法获取锁并重新排队。

这是一个伪代码:

def my_task(user_id, *args, **kwargs):
    if app.distlock(user_id, blocking=False):
        exec_my_task(user_id, *args, **kwargs)
    else:
        raise RetryTask()
Run Code Online (Sandbox Code Playgroud)

不要忘记刷新释放

采用类似的方法来强制robots.txt爬虫中每个请求之间的延迟。