Fri, 10 Oct 2003

Weblogs.com Ping Gateway to はてなアンテナ (POE版)

前回紹介した、はてなアンテナ Ping ゲートウェイには、1点問題がありました。Ping を受け取った後、HTTPリクエストを発行してレスポンスを待っているため、Ping 受信に非常に時間がかかることがあるという点です。ここに時間がかかると、Blog ツールのエントリビルド処理が待たされることになり、好ましくありません。Ping を受けとったら即座にレスポンスを返し、非同期にアンテナへのリクエストを投げる方がよいでしょう。

今回のレシピでは、POE と呼ばれるフレームワークを使用して、この処理の非同期化を行ってみます。

Perl の非同期処理

一般に、外部へのネットワーク接続(この場合は、はてなアンテナへのHTTP接続)などによるブロッキング(*1)を避けるためには、一連の処理を個別に非同期で(asynchronously)実行する必要があります。Perl で非同期処理を行うにはいくつかの選択肢が用意されており、 といった方法があります。それぞれ簡単に概要を解説しておきましょう。

fork() は UNIX では一般によく利用される手法で、プロセスの複製をつくって、親プロセスと子プロセスで処理を分岐させます。Perl でも関数 fork を利用すれば実装は容易です。ただし、今回の場合は Web サーバ (httpd) から CGI 経由で XML-RPC サーバを動作させている都合上、あまり fork を利用するのは好ましくない(*2)気もします。

perl のスレッドは非常に実験的ですが、5.8.1 からはより安定して使用できるようになりました。5.8 でのスレッド実装は、スレッド1つ毎に perl インタプリタを作成する実装(*3)となっており、5.6 以前のネイティブなスレッド実装とはまったく異なります。ネイティブなスレッドに比べると速度やメモリ消費の点で劣りますが、より安全に利用/実装できる点が特長です。ただ、5.8.1 でより安定したとはいえ、インタプリタ間で変数がデフォルトでは共有されない(*4)など、注意すべき点も多いため、今回は threads の利用は避けておきます。

POE は Persistent Object Environment (など様々なモノ)の略で(*5)、Perl でイベントドリブンなプログラミングを行うためのフレームワークです。POE は内部で selectalarm を用いてノンブロッキングな処理を実行する仕組みを持っており、ユーザはイベントを非同期にキューイングして実行することができ、ユーザレベルでのスレッドプログラミングを簡単に行うことができます。CPAN には様々な POE Component(*6) が登録されていて、マルチタスクなアプリケーションを組むには最適といえます。コーディングのルールには多少クセがありますが、ハマるとなかなか抜け出せない魅力も同時に持っています。

サンプルコード

今回は、POE を利用して、XML-RPC で Ping を受けとるタスクと、HTTP で はてなアンテナにリクエストを送信するタスクの2つを非同期に実行させます。スクリプトの実装は List 1 のようになります。

use POE qw(Component::Server::XMLRPC
           Component::Client::HTTP
           Component::TSTP);
POE モジュールと、関連する Component モジュールを use しています。これは

use POE;
use POE::Component::Server::XMLRPC;
...

と書いても同じことですが、POE モジュールを use する際の import 引数に追加することにより、同時にロードすることができます。これは POE を使用する際の慣習のようなものなので、そのまま利用してください。

use POE::Sugar::Args;
POE::Sugar::Args モジュールを use しています。このモジュールは、POE を使用するときに汚なくなりがちな表記を楽に書くことができる Syntactic Sugar です。よってとくに使う必要はないのですが、コードの見やすさをとるため使用してみました。

use HTTP::Request;
use Sys::Hostname;
use URI;
HTTP リクエストを生成する HTTP::Request、自身のホスト名を取得する Sys::Hostname、URI を生成する URI をロードしています。

our $VERSION    = "1.00";
our $HatenaURL  = "http://a.hatena.ne.jp/check";
後で User-Agent 文字列に使うスクリプトのバージョンと、はてなアンテナの API URL をグローバルに設定しています(*7)

my $port = shift || 10080;
XML-RPC サーバが listen するポート番号を指定します。コマンドラインの第1引数か、指定されていなければデフォルトで 10080 を使用します。

# session to trap Ctrl-Z
POE::Component::TSTP->create();
POE のプログラムは、無限ループで動作します。何も指定しない場合 Ctrl-C や Ctrl-Z を入力しても受け付けてくれません。この POE::Component::TSTP は、Ctrl-Z でサスペンドする機能をスクリプトに付加するコンポーネントです。

# session for XML-RPC server
POE::Component::Server::XMLRPC->new(
    alias => 'xmlrpc',
    port  => $port,
);
XML-RPC サーバ用の Session (Session は POE の用語で、スレッドのようなものと考えればOKです) を起動します。それぞれの Session には名前をつけることができ、Session 間でのコミュニケーションに使用します。ここでは名前を xmlrpc としました。

# session to send pings in HTTP
POE::Component::Client::HTTP->spawn(
    Alias => 'ua',
    Agent => "ping2hatena/$VERSION",
);
HTTP クライアントの Session を起動します。名前は ua (User-Agent の意) とし、HTTP リクエストの User-Agent: に使用する文字列 Agent にはスクリプト名とバージョン番号をいれています。

# main session
POE::Session->create(
    inline_states => {
        _start => \&setup_service,
        _stop  => \&shutdown_service,
        "weblogUpdates.ping" => \&ping_handler,
        got_response => \&response_handler,
    },
);
これらの Session に指令を出し、コントロールする Session を最後につくります。引数 inline_states は、ハッシュリファレンスで、key がイベント名、value がそのイベントで実行されるサブルーチンのリファレンスとなります。_start_stop は、先頭の _ が意味する通り、特殊なイベント名で、POE プログラムが起動するときおよび終了するときに実行されるイベントです。

$poe_kernel->run();
$poe_kernel は、POE のすべての処理をハンドルするカーネルです。カーネルに対して run() メソッドを呼び出すことで、すべての Session が動作を開始します。ここからプログラムは、カーネルが管理するイベントに応じて、各処理が非同期に呼び出されることになります。

sub setup_service {
    my $poe = sweet_args;
    $poe->kernel->alias_set("ping2hatena");
    $poe->kernel->post(xmlrpc  => 'publish',
                       ping2hatena => "weblogUpdates.ping");
    my $hostname = Sys::Hostname::hostname || 'localhost';
    warn "URI: http://$hostname:$port/?session=ping2hatena\n";
}
先ほど _start イベントに設定したサブルーチン setup_service が、まず最初に起動されます。

ここでは main のセッションに ping2hatena という別名をつけ、次に xmlrpc の publish() メソッドを呼び出す処理を POE カーネルにキューイングしています。publish メソッドの引数はそれぞれ、公開した XML-RPC API の処理を行う Session 名 (ping2hatena) と、公開するメソッド名 (= イベント名: weblogUpdates.ping) を表しています。

Sys::Hostname の hostname でホスト名を得た後、XML-RPC サーバの URL を warn で表示しています。URL には ?session={Session名} のクエリが必要なことに注意してください(*8)

sub shutdown_service {
    my $poe = sweet_args;
    $poe->kernel->post(xmlrpc  => 'rescind',
                       ping2hatena => "weblogUpdates.ping");
}
shutdown_service では、Server::XMLRPC の C メソッドで、API のクローズを行っています。POE::Component::Server::XMLRPC のドキュメントに記述されているため、ここにも書いておきましたが、実際には必要ないはずです。

sub ping_handler {
    my $poe = sweet_args;
    my $transaction = $poe->args->[0];
    my($blog_name, $blog_url) = @{$transaction->params()};
    my $url = URI->new($HatenaURL);
       $url->query_form(url => $blog_url);
    my $req = HTTP::Request->new(GET => $url);
    $poe->kernel->post(ua => request => got_response => $req);

    my %data = (flerror => XMLRPC::Data->type(boolean => 0));
    $transaction->return(\%data);
}
実際に XML-RPC サーバに対して API がリクエストされると、先ほど publish したイベント weblogUpdates.ping が起動され、イベントに対応するサブルーチン ping_handler が呼び出されます。$transaction は XML-RPC のトランザクションを表わすオブジェクトで params() メソッドで API への引数を取りだします。

URI.pm を使用して、はてなアンテナの更新通知用の URL を生成します(*9)。HTTP::Request オブジェクトを new して、Client::HTTP である ua に、リクエストを投げる依頼をします。メソッド request の引数は、リクエストに対するレスポンスを受け取った際に起動されるイベント名 (got_response)、HTTP::Request オブジェクトです。

そして前回と同様、weblogUpdates.ping の返り値である構造体を作成して $transaction 経由で return しています。実際には先ほどの $poe->kernel->post()$transaction->return() も、イベントがキューイングされるだけで、その後 POE カーネルにより非同期で実行されることに注意してください。

こうして、はてなアンテナへのリクエスト送信とレスポンス受信を待つことなく、Blog クライアントに Ping のレスポンスを返すことができます。

sub response_handler {
    my $poe = sweet_args;
    my $request  = $poe->args->[0]->[0];
    my $response = $poe->args->[1]->[0];
    if (defined($response->content)) {
        warn "successful request to ", $request->uri();
    } else {
        warn "error while requesting ", $request->uri();
    }
}
実際のリクエスト送信は ua が行います。レスポンスを受信すると got_response イベントが起動し、登録した response_handler が呼び出されます。ここでは HTTP::Response オブジェクトを取り出し、成功か失敗かを warn で表示しているだけです。

実行例

まずはサーバを起動します。

% ./ping_to_hatena_poe.pl
URI: http://localhost:10080/?session=ping2hatena

出力された URL を、Blog ツールの Ping 通知先に指定します。

ここでは単体での確認のため、XML-RPCのクライアントも自作して実行してみましょう。XMLRPC::Lite を使用したクライアントスクリプトは List 2 のような簡単なものになります。引数に XML-RPC の URL, Blog名 と Blog URL を指定して実行します。

% ./ping_client.pl "http://localhost:10080/?session=ping2hatena" \
  "Blog Developer's Cookbook" "http://blog.bulknews.net/cookbook/"

うまくいけば、まずクライアント側に

flerror=0

のように返り値が表示され、時間をおいてから、サーバ側のプロンプトに、

successful request to http://a.hatena.ne.jp/check?url=http%3A%2F%2F
blog.bulknews.net%2Fcookbook%2F at ./ping_to_hatena_poe.pl line 79.

と表示され、Ping へのレスポンスと、HTTP リクエストが非同期で行われていることが確認できます。

サーバのシャットダウンは Ctrl-Z でサスペンドし、 PID を指定して kill しましょう。

まとめ

POE は、とくにネットワークが絡むマルチタスクを非同期で処理するための魅力的なフレームワークです。今回は HTTP でリクエストを送信しましたが、インスタントメッセンジャーで通知したり、別の Blog にポストしたりと、様々な応用例が考えられます。2003/10/18 の Shibuya Perl Mongers テクニカルトーク #4 では、NDO::Weblog の naoya 氏が、受けとった Ping を IRC チャネルにポストする bot の紹介をする予定ですのでお楽しみに。

Listings

List 1: ping_to_hatena_poe.pl
#!/usr/local/bin/perl -w
# ping_to_hatena_poe
# - Weblogs.Com Ping Gateway to Hatena Antenna (POE)

use strict;
use POE qw(Component::Server::XMLRPC
           Component::Client::HTTP
           Component::TSTP);
use POE::Sugar::Args;
use HTTP::Request;
use Sys::Hostname;
use URI;

our $VERSION    = "1.00";
our $HatenaURL  = "http://a.hatena.ne.jp/check";

my $port = shift || 10080;

# session to trap Ctrl-Z
POE::Component::TSTP->create();

# session for XML-RPC server
POE::Component::Server::XMLRPC->new(
    alias => 'xmlrpc',
    port  => $port,
);

# session to send pings in HTTP
POE::Component::Client::HTTP->spawn(
    Alias => 'ua',
    Agent => "ping2hatena/$VERSION",
);

# main session
POE::Session->create(
    inline_states => {
        _start => \&setup_service,
        _stop  => \&shutdown_service,
        "weblogUpdates.ping" => \&ping_handler,
        got_response => \&response_handler,
    },
);

$poe_kernel->run();

sub setup_service {
    my $poe = sweet_args;
    $poe->kernel->alias_set("ping2hatena");
    $poe->kernel->post(xmlrpc  => 'publish',
                       ping2hatena => "weblogUpdates.ping");
    my $hostname = Sys::Hostname::hostname || 'localhost';
    warn "URI: http://$hostname:$port/?session=ping2hatena\n";
}

sub shutdown_service {
    my $poe = sweet_args;
    $poe->kernel->post(xmlrpc  => 'rescind',
                       ping2hatena => "weblogUpdates.ping");
}

sub ping_handler {
    my $poe = sweet_args;
    my $transaction = $poe->args->[0];
    my($blog_name, $blog_url) = @{$transaction->params()};
    my $url = URI->new($HatenaURL);
       $url->query_form(url => $blog_url);
    my $req = HTTP::Request->new(GET => $url);
    $poe->kernel->post(ua => request => got_response => $req);

    my %data = (flerror => XMLRPC::Data->type(boolean => 0));
    $transaction->return(\%data);
}

sub response_handler {
    my $poe = sweet_args;
    my $request  = $poe->args->[0]->[0];
    my $response = $poe->args->[1]->[0];
    if (defined($response->content)) {
        warn "successful request to ", $request->uri();
    } else {
        warn "error while requesting ", $request->uri();
    }
}
List 2: ping_client.pl
#!/usr/local/bin/perl -w
# ping_client - weblogUpdates.ping client

use strict;
use XMLRPC::Lite;
my($proxy, @args) = @ARGV;

my $result = XMLRPC::Lite
    ->proxy($proxy)
    ->call("weblogUpdates.ping", @args)
    ->result;
print "flerror=$result->{flerror}\n";
*1) ソケットの読み書きやファイルロックなど、外的要因で待たされる処理
*2) というより気持ちよくない
*3) interpreter based threads を略して ithreads と呼ばれます
*4) threads::shared を使用して明示的に共有する必要があります。
*5) 詳細は What POE Is を参照してください。
*6) しばしば POE::Component:: は PoCo:: と略されることがあります
*7) 前回は my で宣言して BEGIN { } で設定していました。
*8) POE::Component::Server::XMLRPC のドキュメントには書いていない (EXAMPLE にかろうじて書いてある) のでハマりやすいです
*9) 前回は URI::Escape と sprintf を使用していました。
Prev: Weblogs.com Ping Gateway to はてなアンテナ
Next: RSS Auto Discovery

About This Blog

Weblog まわりのテクノロジーをネタにして、プログラミングテクニックを Cookbook 形式で紹介しています。

Site Search


WWW This Site