Commit 53d49137 by PotatoGim

some Coro + AnyEvent examples

parent 7dbab981
#!/usr/bin/env perl
our $AUTHORITY = 'cpan:potatogim';
use strict;
use warnings;
use utf8;
use Coro;
use Coro::AnyEvent;
use Guard;
my $cv = AnyEvent->condvar;
async {
my $count = 0;
until (++$count > 30)
{
Coro::AnyEvent::sleep rand(0.5);
print $Coro::current, " : $count\n";
cede;
}
$cv->send;
};
async {
while (1)
{
print $Coro::current, "\n";
Coro::AnyEvent::sleep 1;
cede;
}
};
$cv->recv;
async {
my $guard = Guard::guard { print "destroyed\n" };
schedule while 1;
};
cede;
=encoding utf8
=head1 NAME
coro.pl -
=head1 SYNOPSIS
=head1 DESCRIPTION
=head1 AUTHOR
Ji-Hyeon Gim E<lt>potatogim@gluesys.comE<gt>
=head1 CONTRIBUTORS
=head1 COPYRIGHT AND LICENSE
Copyright 2018. Gluesys Co., Ltd. All rights reserved.
=head1 SEE ALSO
=head1 DATE
07/24/2018 01:37:49 AM
=cut
#!/usr/bin/env perl
use strict;
use Coro;
use Coro::Channel;
use Coro::Semaphore;
use Coro::Timer;
use URI;
use FurlX::Coro;
use Web::Query;
use Try::Tiny;
my @done;
my @fail;
sub done { push @done, $_[0] }
sub fail { push @fail, $_[0] }
my %queue;
sub queue {
my $name = shift;
$queue{$name} ||= Coro::Channel->new;
}
sub logger {
my $msg = shift;
# 時刻と現在のスレッド名を出力
warn localtime . sprintf ": %s %s\n", Coro::current->desc, $msg;
}
# 全体の同時接続数制限
my $global_lock = Coro::Semaphore->new(5);
sub global_lock {
$global_lock->guard;
}
# ホストごとの接続数制限
my %lock;
my $use_sleep = 1;
sub host_lock {
my $url = shift;
my $host = URI->new($url)->host;
my $sem = $lock{$host} ||= Coro::Semaphore->new(1);
# 最後の接続から一定時間ウェイトを入れる
if ($use_sleep && $sem->count == 0) {
my $guard = $sem->guard;
Coro::Timer::sleep 3;
return $guard;
}
$sem->guard;
}
1
sub fetcher {
my $url = queue("fetch")->get;
my $lock = host_lock($url);
my $glock = global_lock();
my $ua = FurlX::Coro->new;
logger($url);
my $res = $ua->get($url);
queue("parse")->put([$url, $res]);
}
2
sub parser {
my $data = queue("parse")->get;
my ($url, $res) = @$data;
logger($url);
# タイトルを抜き出す場合
my $title = Web::Query->new_from_html($res->content)
->find("title")->text;
queue("update")->put([$url, $title])
}
3
sub updater {
my $data = queue("update")->get;
my ($url, $res) = @$data;
logger($url);
warn $res;
done($res);
}
sub create_worker {
my ( $name, $code, $num ) = @_;
for ( 0 .. $num ) {
my $desc = $name . "_" . $_;
async_pool {
Coro::current->desc($desc);
while (1) {
try {
$code->()
} catch {
warn $_;
fail($_);
}
}
}
}
}
create_worker( fetcher => \&fetcher, 1000 );
create_worker( parser => \&parser, 1 );
create_worker( updater => \&updater, 1 );
# 取得するURL
my @list = qw(
http://localhost/1
http://localhost/2
http://local.example.com/2
http://local.example.com/3
);
my $stop_flag = 0;
my $force_exit = 0;
my $total = scalar @list;
my $main = Coro::current;
# シグナルを受け取って「終了」フラグを立てる
$SIG{INT} = sub {
if ($stop_flag == 1) { $force_exit = 1 }
$stop_flag = 1;
if ($force_exit) {
die "exit";
}
};
# manager
async {
while (1) {
Coro::Timer::sleep 1;
# 作業途中のデータが失われないようにするとよい
# 新規のジョブは受け取らないようにして
# 実行中のジョブが終了するのを待つ,など
if ($stop_flag) {
warn "signal recieved!!! ";
async {
Coro::Timer::sleep 10;
$stop_flag = 0
}
}
warn sprintf "Task: %s/%s Fail: %s",
scalar @done, $total, scalar @fail;
my $done = scalar @done + scalar @fail;
if ($done == $total) {
warn "All task done!";
$main->ready;
}
}
};
queue("fetch")->put($_) for @list;
schedule;
http_fetch($url, sub {
my ($url, $res) = @_;
parse_response($res, sub {
my $res = shift;
update_databse($res, sub {
warn "done!"
})
})
});
#!/usr/bin/env perl
use strict;
use Coro;
use Coro::Socket;
use Coro::Handle;
use Coro::PatchSet;
my @clients;
pipe(my $reader, my $writer);
defined( my $child = fork )
or die 'fork: ', $!;
if ($child == 0) {
close $reader;
sub blocking_receive_function {
sleep 1;
return "test\n";
}
while (1) {
my $str = blocking_receive_function();
syswrite($writer, $str);
}
exit;
}
close $writer;
$reader = unblock $reader; # make it Coro aware
async_pool {
# thread for reading blocking_receive_function results
# and write it to clients
while (1) {
my $line = $reader->readline();
for (my $i=$#clients; $i>=0; $i--) {
unless (defined $clients[$i]->syswrite($line)) {
# some error
# we need to do smth
# let's remove this client
warn "Error on $clients[$i]: $!";
splice @clients, $i, 1;
}
}
}
}
my $server = Coro::Socket->new(Listen => 1024, LocalPort => 8000)
or die "Can't create server", $@;
while (1) {
my $client = $server->accept()
or next;
push @clients, $client;
}
# printf "Hello World!" | (exec 3<>/dev/tcp/localhost/8000; cat >&3; cat <&3; exec 3<&-)
# test
# test
# test
# test
#!/usr/bin/env perl
BEGIN {
no strict 'refs';
use Symbol ();
use Coro::AIO;
use v5.10;
#
# http://perldoc.perl.org/perlsub.html#Prototypes
# > If subroutine signatures are enabled (see Signatures), then the shorter
# > PROTO syntax is unavailable, because it would clash with signatures. In
# > that case, a prototype can only be declared in the form of an
# > attribute.
#
*CORE::GLOBAL::open = sub : prototype(*;$@) {
#
# http://perldoc.perl.org/functions/open.html
# > If FILEHANDLE is an undefined scalar variable (or array or hash
# > element), a new filehandle is autovivified, meaning that the
# > variable is assigned a reference to a newly allocated anonymous
# > filehandle. Otherwise if FILEHANDLE is an expression, its value is
# > the real filehandle. (This is considered a symbolic reference, so
# > use strict "refs" should not be in effect.)
#
# http://perldoc.perl.org/Symbol.html
# > Symbol::qualify turns unqualified symbol names into qualified
# > variable names (e.g. "myvar" -> "MyPackage::myvar"). If it is given
# > a second parameter, qualify uses it as the default package;
# > otherwise, it uses the package of its caller. Regardless, global
# > variable names (e.g. "STDOUT", "ENV", "SIG") are always qualified
# > with "main::".
#
# http://perldoc.perl.org/functions/caller.html
# > Returns the context of the current pure perl subroutine call. In
# > scalar context, returns the caller's package name if there is
# > a caller (that is, if we're in a subroutine or eval or require) and
# > the undefined value otherwise.
#
if (defined($_[0]))
{
unshift(@_, Symbol::qualify(shift, scalar(caller)));
}
#
# open FILEHANDLE
#
# http://perldoc.perl.org/CORE.html#DESCRIPTION
# > For all Perl keywords, a CORE:: prefix will force the built-in
# > function to be used, even if it has been overridden or would
# > normally require the feature pragma. Despite appearances, this has
# > nothing to do with the CORE package, but is part of Perl's syntax.
#
if (@_ == 1)
{
return CORE::open($_[0]);
}
#
# open FILEHANDLE,EXPR
#
elsif (@_ == 2)
{
return CORE::open($_[0], $_[1]);
}
#
# open FILEHANDLE,MODE,EXPR
# open FILEHANDLE,MODE,REFERENCE
#
# http://perldoc.perl.org/functions/open.html
# > As a special case the three-argument form with a read/write mode
# > and the third argument being undef:
# >
# > open(my $tmp, "+>", undef) or die ...
# >
# > opens a filehandle to an anonymous temporary file. Also using +<
# > works for symmetry, but you really should consider writing
# > something to the temporary file first. You will need to seek() to
# > do the reading.
#
if (@_ == 3 && defined($_[2]))
{
return $_[0] = aio_open($_[2], O_RDONLY, 0);
# return CORE::open($_[0], $_[1], $_[2]);
}
elsif (@_ == 3)
{
return CORE::open($_[0], $_[1], undef);
}
#
# open FILEHANDLE,MODE,REFERENCE
#
# http://perldoc.perl.org/functions/open.html
# > In the form of pipe opens taking three or more arguments, if LIST
# > is specified (extra arguments after the command name) then LIST
# > becomes arguments to the command invoked if the platform supports
# > it. The meaning of open with more than three arguments for non-pipe
# > modes is not yet defined, but experimental "layers" may give extra
# > LIST arguments meaning.
#
return CORE::open($_[0], $_[1], @_[2..$#_]);
};
state %TABLE;
*CORE::GLOBAL::readline = sub {
my $rbuf = '';
while (1) {
my $char;
aio_read($_[0], $TABLE{$_[0]}++, 1, $char, 0);
return undef if ($char eq '');
last if ($char eq "\n");
$rbuf .= $char;
}
return $_ = "$rbuf\n";
};
}
use Coro;
use AnyEvent;
use Coro::Multicore;
use Coro::AnyEvent;
use AnyEvent::HTTP;
use Data::Dumper;
my @coro;
my $count = $ARGV[0] // 2;
my $done = 0;
my $condvar = AnyEvent->condvar;
for my $num (1..$count)
{
push(@coro, async {
$|++;
open(my $fh, '<', 'coro_test.pl');
http_get(
'http://www.daum.net/',
sub { print "$num: HTTP: $_[1]->{Status}\n"; }
);
my $rbuf = '';
while (<$fh>)
{
$rbuf .= $_;
}
close($fh);
print "$num: FILE: ${\length($rbuf)}\n";
$done++;
if ($done >= $count)
{
$condvar->send;
}
});
}
$condvar->recv;
#!/usr/bin/env perl
our $AUTHORITY = 'cpan:potatogim';
use strict;
use warnings;
use utf8;
use Coro;
use Coro::AnyEvent;
use AnyEvent;
use AnyEvent::HTTP;
use JSON qw/from_json/;
use Try::Tiny;
use Data::Dumper;
my %table;
my $coro = async
{
print "Starting watcher...\n";
my $idx = 0;
while (1)
{
my $retval = try
{
my $url = sprintf('http://%s:%d/v2/keys%s?wait=true&waitIndex=%d&recursive=true'
, '192.168.2.37'
, 4001
, '/'
, $idx);
print "Waiting: $url\n";
http_get($url, Coro::rouse_cb);
my ($change, $header) = Coro::rouse_wait;
if ($header->{Status} == 401)
{
$idx = $header->{'x-etcd-index'};
next;
}
$idx = $change->{node}->{modifiedIndex} + 1;
print "Changing: ${\Dumper($change)}";
$change = from_json($change);
my $target = \%table;
my @path = split(/\/+/, substr($change->{node}->{key}, 1));
for (my $i=0; $i<$#path; $i++)
{
# create a hash for new key
if (!defined($target->{$path[$i]}))
{
$target->{$path[$i]} = {};
}
$target = $target->{$path[$i]};
}
if ($change->{action} eq 'delete')
{
delete($target->{$path[$#path]});
}
else
{
$target->{$path[$#path]} = $change->{node}->{value};
}
return 0;
}
catch
{
warn @_;
return -1;
};
}
print "Watcher is stopped...\n";
};
while (1)
{
# CPU throttling
#Coro::AnyEvent::idle_upto 5;
# CPU throttling
#Coro::AnyEvent::idle;
# Why does not working?
#Coro::AnyEvent::poll;
# It working gracefully and not caused CPU throttling
Coro::AnyEvent::sleep 5;
}
=encoding utf8
=head1 NAME
etcd_watching.pl -
=head1 SYNOPSIS
=head1 DESCRIPTION
=head1 AUTHOR
Ji-Hyeon Gim E<lt>potatogim@gluesys.comE<gt>
=head1 CONTRIBUTORS
=head1 COPYRIGHT AND LICENSE
Copyright 2018. Gluesys Co., Ltd. All rights reserved.
=head1 SEE ALSO
=head1 DATE
2018년 07월 24일 01시 46분 50초
=cut
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment