--- trunk/experiments/protocol-v3/server.pl 2009/11/01 21:00:30 164 +++ trunk/lib/Sack/Server.pm 2009/11/08 13:44:10 189 @@ -1,5 +1,7 @@ #!/usr/bin/perl +package Sack::Server; + use warnings; use strict; @@ -8,20 +10,36 @@ use Data::Dump qw(dump); use Storable qw(); +use File::Slurp; +use Cwd qw(abs_path); + +my @cloud; +my $cloud_path = $ENV{CLOUD} || die "start with: CLOUD=etc/cloud perl -I/srv/Sack/lib $0\n"; +@cloud = read_file $cloud_path; +@cloud = map { chomp $_; $_ } @cloud; + +warn "# cloud ",dump( @cloud ); my $listen_port = 4444; -my $node_path = '/tmp/client.pl'; +my $node_path = abs_path $0; +$node_path =~ s{Server}{Client}; -my $lsn = IO::Socket::INET->new(Listen => 1, LocalPort => $listen_port, Reuse => 1) or exit; +my $lsn = IO::Socket::INET->new(Listen => 1, LocalPort => $listen_port, Reuse => 1) or die $!; my $sel = IO::Select->new($lsn); -sub fork_node { +my $info; +sub info { + my $port = shift; + push @{ $info->{$port} }, [ @_ ]; +} + +sub fork_ssh { my ( $port, $host ) = @_; if ( my $pid = fork ) { # parent - + info $port => 'forked', $pid; return $port; } elsif ( ! defined $pid ) { @@ -29,7 +47,7 @@ return; } else { # child - my $cmd = qq|ssh -R $port:127.0.0.1:$listen_port $host $node_path $port|; + my $cmd = qq|ssh -F $cloud_path.ssh -R $port:127.0.0.1:$listen_port $host $node_path $port|; warn "# exec: $cmd\n"; exec $cmd; } @@ -37,33 +55,54 @@ my $node_port = 4000; -foreach my $host ( qw/localhost tab.lan llin.lan/ ) { - system "scp client.pl $host:$node_path"; - fork_node( $node_port++, $host ); +foreach my $host ( @cloud ) { + system "find /srv/Sack/ | cpio --create --dereference | ssh -T -F $cloud_path.ssh $host cpio --extract --make-directories --unconditional"; + fork_ssh( $node_port++, $host ); } my $session; +sub to_all { + my $data = shift; + foreach my $port ( keys %{ $session->{port} } ) { + warn ">>>> [$port]\n"; + Storable::store_fd( $data, $session->{port}->{$port} ); + } +} + while (1) { for my $sock ($sel->can_read(1)) { if ($sock == $lsn) { my $new = $lsn->accept; $sel->add($new); - $session->{$new} = $new->peerhost; + $session->{peerport}->{ $new->peerport } = $new; warn "[socket] connect\n"; Storable::store_fd( { ping => 1 }, $new ); + info 0 => 'ping', $new->peerport; } else { - if ( $sock->connected ) { - my $data = Storable::fd_retrieve( $sock ); - warn "<<<< ", dump($data), $/; - if ( $data->{repl} ) { - Storable::store_fd( { repl => $data }, $sock ); - } - } else { + my $data = eval { Storable::fd_retrieve( $sock ) }; + if ( $@ ) { delete $session->{$sock}; - warn "[socket] disconnect\n"; + warn "[socket] disconnect: $@\n"; $sel->remove($sock); $sock->close; + } else { + warn "<<<< ", dump($data), $/; + if ( $data->{repl} ) { + my $response = { repl => $$ }; + if ( $data->{repl} =~ m/ping/ ) { + to_all { ping => 1 }; + } elsif ( $data->{repl} =~ m/info/ ) { + $response->{info} = $info; + } else { + $response->{error}->{unknown} = $data; + } + Storable::store_fd( $response, $sock ); + } elsif ( $data->{ping} ) { + my $port = $data->{port}; + info $port => 'ping', $port; + $session->{port}->{ $data->{port} } = $sock; + } } } }