将select(2)和缓冲IO组合成文件句柄是否安全?

Lom*_*mky 5 io perl buffer

我正在使用IO :: Select来跟踪可变数量的文件句柄以供阅读.我遇到的文档强烈建议不要将select语句与<>(readline)结合起来从文件句柄中读取.

我的情况:

我只会使用一次文件句柄,即当select为我提供文件句柄时,它将被完全使用,然后从select中删除.我将收到一个哈希值和一个可变数量的文件.我不介意这会阻塞一段时间.

有关更多上下文,我是一个发送信息以供我的服务器处理的客户端.每个文件句柄都是我正在与之交谈的不同服务器.服务器完成后,哈希结果将从每个服务器发回给我.在该哈希内部是一个数字,表示要遵循的文件数.

我希望使用readline以便与现有项目代码集成以传输Perl对象和文件.

示例代码:

my $read_set = IO::Select()->new;
my $count = @agents_to_run; #array comes as an argument

for $agent ( @agents_to_run ) { 
    ( $sock, my $peerhost, my $peerport ) 
        = server($config_settings{ $agent }->
            { 'Host' },$config_settings{ $agent }->{ 'Port' };
    $read_set->add( $sock );

}

while ( $count > 0) {
    my @rh_set = IO::Select->can_read();

    for my $rh ( @{ $rh_set } ) {

            my %results = <$rh>;
            my $num_files = $results{'numFiles'};
            my @files = ();
            for (my i; i < $num_files; i++) {
                    $files[i]=<$rh>;
            }                 
            #process results, close fh, decrement count, etc
    }
}
Run Code Online (Sandbox Code Playgroud)

ike*_*ami 10

使用readline(又名<>)是完全错误的,原因有二:它是缓冲的,它是阻塞的.


缓冲很糟糕

更准确地说,使用无法检查的缓冲区进行缓冲是不好的.

系统可以完成它想要的所有缓冲,因为你可以使用它来查看它的缓冲区select.

Perl的IO系统不允许进行任何缓冲,因为您无法查看其缓冲区.

让我们看readline一个select循环中可能发生的事情的例子.

  • "abc\ndef\n" 到达手柄.
  • select 通知您有要读取的数据.
  • readline 将尝试从句柄中读取一个块.
  • "abc\ndef\n" 将被放置在Perl的句柄缓冲区中.
  • readline会回来的"abc\n".

此时,您select再次呼叫,并且您希望它让您知道还有更多要读("def\n").但是,select报告没有什么可读的,因为select是系统调用,并且数据已经从系统中读取.这意味着你必须等待更多才能进入才能阅读"def\n".

以下程序说明了这一点:

use IO::Select qw( );
use IO::Handle qw( );

sub producer {
    my ($fh) = @_;
    for (;;) {
        print($fh time(), "\n") or die;
        print($fh time(), "\n") or die;
        sleep(3);
    }
}

sub consumer {
    my ($fh) = @_;
    my $sel = IO::Select->new($fh);
    while ($sel->can_read()) {
        my $got = <$fh>;
        last if !defined($got);
        chomp $got;
        print("It took ", (time()-$got), " seconds to get the msg\n");
    }
}

pipe(my $rfh, my $wfh) or die;
$wfh->autoflush(1);
fork() ? producer($wfh) : consumer($rfh);
Run Code Online (Sandbox Code Playgroud)

输出:

It took 0 seconds to get the msg
It took 3 seconds to get the msg
It took 0 seconds to get the msg
It took 3 seconds to get the msg
It took 0 seconds to get the msg
...
Run Code Online (Sandbox Code Playgroud)

这可以使用非缓冲IO修复:

sub consumer {
    my ($fh) = @_;
    my $sel = IO::Select->new($fh);
    my $buf = '';
    while ($sel->can_read()) {
        sysread($fh, $buf, 64*1024, length($buf)) or last;
        while ( my ($got) = $buf =~ s/^(.*)\n// ) {
            print("It took ", (time()-$got), " seconds to get the msg\n");
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

输出:

It took 0 seconds to get the msg
It took 0 seconds to get the msg
It took 0 seconds to get the msg
It took 0 seconds to get the msg
It took 0 seconds to get the msg
It took 0 seconds to get the msg
...
Run Code Online (Sandbox Code Playgroud)

封锁很糟糕

让我们看readline一个select循环中可能发生的事情的例子.

  • "abc\ndef\n" 到达手柄.
  • select 通知您有要读取的数据.
  • readline 将尝试从套接字读取一个块.
  • "abc\ndef\n" 将被放置在Perl的句柄缓冲区中.
  • readline 没有收到换行符,所以它试图从套接字中读取另一个块.
  • 目前没有更多数据可用,因此它会阻止.

这违背了使用目的select.

[演示代码即将发布]


您必须实现readline不阻止的版本,并且只使用您可以检查的缓冲区.第二部分很简单,因为您可以检查您创建的缓冲区.

  • 为每个句柄创建一个缓冲区.
  • 当数据从句柄到达时,请阅读但不能再阅读.当数据等待时(正如我们所知select),sysread将返回可用的内容,而无需等待更多内容到达.这sysread完美地完成了这项任务.
  • 将读取的数据附加到适当的缓冲区.
  • 对于缓冲区中的每个完整消息,将其解压缩并处理.

添加句柄:

$select->add($fh);
$clients{fileno($fh)} = {
    buf  => '',
    ...
};
Run Code Online (Sandbox Code Playgroud)

select 环:

while (my @ready = $select->can_read) {
    for my $fh (@ready) {
        my $client = $clients{fileno($fh)};
        our $buf; local *buf = \($client->{buf});  # alias $buf = $client->{buf};

        my $rv = sysread($fh, $buf, 64*1024, length($buf));
        if (!$rv) {
            if (!defined($rv)) {
                ... # Handle error
            }
            elsif (length($buf)) {
                ... # Handle eof with partial message
            }
            else {
                ... # Handle eof
            }

            delete $clients{fileno($fh)};
            $sel->remove($fh);
            next;
        }

        while ( my ($msg) = $buf =~ s/^(.*)\n// )
            ... # Process message.
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

顺便说一下,使用线程更容易做到,这甚至不能处理作者!


请注意,IPC :: Run可以为您完成所有艰苦的工作,并且可以使用异步IO作为替代方案select.