Perl Advent Calendar 2010-12-15

Parallel::Scoreboard

by cnhackTNT

多进程的问题之一是如何汇总各个进程当前的的状态,就像 Apache 服务器的 mod_status 模块所实现的功能那样。

首先浮现在脑海中的方法可能是通过管道或者共享变量,这些方法要么需要锁,要么相对太复杂。

本篇要用到的 Parallel::Scoreboard 提供了基于文件系统来实现跟踪多进程状态的方法。

Parallel::Scoreboard 模块

Scoreboard 原本指的是记分牌,用来分别记录多个分值。如 NBA 比赛场上那面大的电子记分牌能分别记录各个队员的分值情况一样,Parallel::Scoreboard 针对每个进程会提供一个单独的文件来记录其状态,因此使用者不需要考虑锁的问题就可以通过该模块来汇总各个进程当前的状态。

下面我们来了解一下 Parallel::Scoreboard 提供的几个简单方法:

new()

new() 方法用来初始化该模块对象,它有一个 base_dir 参数,指向状态信息的保存目录,如:

use Parallel::Scoreboard;

my $scoreboard = Parallel::Scoreboard->new(
    base_dir => '/tmp/scoreboard'
);

以上代码指定该模块用来保存状态信息的目录为 /tmp/scoreboard,如果该目录不存在,会被自动创建。

update()

update() 方法是该模块最常被用到的方法之一,一般是在多进程程序的多个子进程中分别调用该方法来保存所需的状态信息,如:

# ... in a child process
$scoreboard->update(scalar localtime);

上面的代码属于某个多进程程序的一个子进程,在该子进程运行时会调用 update() 方法记录一段自定义的信息,上面代码中我们记录的信息是当前的时间。$scoreboard 对象会将该信息更新至 /tmp/scoreboard 目录中与该进程关联的文件中。

read_all()

read_all() 方法用来获取所有进程记录的信息,通常我们会在父进程中调用该方法来汇总子进程记录的信息,如:

# ... in parent process

while (1) {
    my $status = $scoreboard->read_all();

    for my $pid (sort {$a <=> $b} keys %$status) {
        print "$pid - ".$status->{$pid}, "\n\n";
    }

    sleep 1;


}

上面的代码是属于父进程的一段代码,它通过一个永远为真的 while() 循环不断地读取所有子进程记录的信息,并将他们打印出来。

cleanup()

cleanup() 方法用来清理信息记录文件,通常当某个子进程退出后,与它相关联的记录文件就无用了,会被自动清理掉。然而有时候整个进程有可能会被突然杀掉,于是就会留下未清理的记录文件,通过该方法可以将所有无效的记录文件清理掉。

另外,其实当我们调用 read_all() 的时候,也会自动清理无效记录文件。

示例

下面我们举个简单的例子:

#!/usr/bin/env perl
# cnhackTNT

use strict;
use IO::File; # avoid autoflush error
use Parallel::Scoreboard;

# set CHLD signal to IGNORE, let OS to reap the child process.
$SIG{'CHLD'} = 'IGNORE';

# scoreboard object.
my $scoreboard = Parallel::Scoreboard->new(
    base_dir => "scoreboard",
);

my @pids;

 # 3 child processes
for (1..3) {

    # fork multi processes
    my $pid = fork();

    if ($pid) { # if $pid > 0, then we are in the parent process
        
        push @pids, $pid # push current child pid to @pids 
        
    } elsif ($pid == 0) { # if $pid == 0, then we are in the child process

        while (1) {

            # update the scoreboard with a random number 1~10
            $scoreboard->update(int(rand(9)) + 1);
            sleep 1;
        
        }
    
    } else {
        # some thing wrong, fork failed.    
        die "Can't fork: $!\n";
    }
}


# set INT signal to handle "ctrl+c"
$SIG{'INT'} = sub {
    print "Daddy asked me to quit...\n";
    kill 'TERM', $_ for @pids; # kill all child processes
    sleep 3;
    $scoreboard->cleanup(); # clean scoreboard
    print "THE END.\n";
    exit;
};

print "I am the parent process, my pid is: $$\n";
print "my children are: @pids\n\n";

while (1) {
    my $status = $scoreboard->read_all(); 

    # get status, sorted by pid
    for my $pid (sort {$a <=> $b} keys %$status) {
        print "child $pid says ".$status->{$pid}."\n";
    }

    print "\n";
    sleep 3;
}

上面的例子会 fork 出三个子进程,每个子进程里面都是一个死循环,并随机 sleep 一段时间后会往 $scoreboard 中记录下一个随机的数字。

父进程初始化并处理好信号后,也会进入一个死循环,它会定期通过 $scoreboardread_all() 方法来获取所有子进程各自记录的那个随机数字,并打印出来。

如果收到用户 ctrl+c 导致的 INT 中断信号,则程序会在打印一段话,杀掉所有子进程并通过 cleanup() 方法清理 $scoreboard 后退出。

文中的代码可以在我的 Github 仓库中的 scoreboard 目录中找到。仓库中还有个 websockets 的示例代码,也用到了该模块,更有意思。

有问题请给 cnhacktnt 在 gmail 的邮箱发信,或者 @cnhacktnt(你懂的) :-)

谢谢!

View Source (POD)