Ben*_*ess 6 mysql perl multithreading fork message-queue
我正在重新开发一个系统,它将通过http向多个供应商之一发送消息.原来是perl脚本,重新开发也可能使用perl.
在旧系统中,有许多perl脚本同时运行,每个供应商有5个.当消息被放入数据库时,选择随机线程号(1-5)和供应商以确保没有消息被处理两次,同时避免必须锁定表/行.此外,数据库中还有一个"公平队列位置"字段,以确保发送大邮件不会延迟发送大邮件时发生的小邮件.
在某些时候,每分钟只会有几条消息,但在其他时候会有数十万条消息被转储.在我看来,如果有所有脚本一直运行并且一直在检查消息,那么资源就像浪费一样,所以如果有更好的方法,或者如果旧方法可以接受的话,我正在尝试解决问题.
我现在的想法是想到让一个脚本运行并根据需要的流量分配尽可能多的子进程(达到一个限制),但我不确定如何以最佳方式实现它消息只处理一次,同时保持公平排队.
我现在最好的猜测是父脚本更新数据库以指示哪个子进程应该处理它,但是我担心这最终会比原始方法效率低.我几乎没有编写分叉代码的经验(上次我做的时间大约是15年前).
任何有关如何最好地处理消息队列的指南的想法或链接表示赞赏!
小智 8
你可以使用Thread :: Queue或其他任何一个:Perl有多处理模块吗?
如果旧系统是用Perl编写的,那么你可以重用它的大部分内容.
非工作示例:
use strict;
use warnings;
use threads;
use Thread::Queue;
my $q = Thread::Queue->new(); # A new empty queue
# Worker thread
my @thrs = threads->create(sub {
while (my $item = $q->dequeue()) {
# Do work on $item
}
})->detach() for 1..10;#for 10 threads
my $dbh = ...
while (1){
#get items from db
my @items = get_items_from_db($dbh);
# Send work to the thread
$q->enqueue(@items);
print "Pending items: "$q->pending()."\n";
sleep 15;#check DB in every 15 secs
}
Run Code Online (Sandbox Code Playgroud)
我建议使用像RabbitMQ这样的消息队列服务器.
一个进程将工作提供给队列,您可以让多个工作进程使用队列.
这种方法的优点:
要动态扩展或缩小数字工作者,您可以实现以下内容:
| 归档时间: |
|
| 查看次数: |
488 次 |
| 最近记录: |