--- trunk/experiments/protocol-v3/server.pl 2009/11/03 17:28:57 173 +++ trunk/lib/Sack/Server.pm 2009/11/08 14:12:38 192 @@ -1,5 +1,7 @@ #!/usr/bin/perl +package Sack::Server; + use warnings; use strict; @@ -9,10 +11,10 @@ use Data::Dump qw(dump); use Storable qw(); use File::Slurp; +use Cwd qw(abs_path); -my @cloud = qw(localhost tab.lan llin.lan); - -my $cloud_path = $ENV{CLOUD} || '/srv/Sack/etc/lib'; +my @cloud; +my $cloud_path = $ENV{CLOUD} || die "start with: CLOUD=etc/cloud perl -I/srv/Sack/lib $0\n"; @cloud = read_file $cloud_path; @cloud = map { chomp $_; $_ } @cloud; @@ -20,8 +22,8 @@ my $listen_port = 4444; -my $node_path = $0; -$node_path =~ s{server.pl}{client.pl}; +my $node_path = abs_path $0; +$node_path =~ s{Server}{Client}; my $lsn = IO::Socket::INET->new(Listen => 1, LocalPort => $listen_port, Reuse => 1) or die $!; my $sel = IO::Select->new($lsn); @@ -32,7 +34,7 @@ push @{ $info->{$port} }, [ @_ ]; } -sub fork_node { +sub fork_ssh { my ( $port, $host ) = @_; if ( my $pid = fork ) { @@ -54,18 +56,28 @@ my $node_port = 4000; foreach my $host ( @cloud ) { - system "echo $node_path | cpio --create --dereference | ssh -T -F $cloud_path.ssh $host cpio --extract --make-directories --unconditional --verbose"; - fork_node( $node_port++, $host ); + system "find /srv/Sack/ | cpio --create --dereference | ssh -T -F $cloud_path.ssh $host cpio --extract --make-directories --unconditional"; + fork_ssh( $node_port++, $host ); } my $session; +sub to_all { + my $data = shift; + foreach my $port ( keys %{ $session->{port} } ) { + warn ">>>> [$port]\n"; + Storable::store_fd( $data, $session->{port}->{$port} ); + } +} + +my @shard_paths; + while (1) { for my $sock ($sel->can_read(1)) { if ($sock == $lsn) { my $new = $lsn->accept; $sel->add($new); - $session->{$new} = $new->peerport; + $session->{peerport}->{ $new->peerport } = $new; warn "[socket] connect\n"; Storable::store_fd( { ping => 1 }, $new ); info 0 => 'ping', $new->peerport; @@ -78,12 +90,36 @@ $sock->close; } else { warn "<<<< ", dump($data), $/; - if ( $data->{repl} ) { - my $response = { repl => $$, info => $info }; + if ( my $repl = $data->{repl} ) { + my $response = { repl_pid => $$ }; + if ( $repl =~ m/ping/ ) { + to_all { ping => 1 }; + } elsif ( $repl =~ m/info/ ) { + $response->{info} = $info; + } elsif ( $repl =~ m{load\s*(\S+)?} ) { + my $name = $1 || 'shard'; + @shard_paths = glob "/tmp/sack/$name/*"; + warn "loading shards ", dump( @shard_paths ); + to_all { load => $name }; + } else { + $response->{error}->{unknown} = $data; + } Storable::store_fd( $response, $sock ); } elsif ( $data->{ping} ) { - $info->{_peer_port}->{$sock->peerport} = $data->{port}; - info $data->{port} => 'peer port', $sock->peerport; + my $port = $data->{port}; + info $port => 'ping', $port; + $session->{port}->{ $data->{port} } = $sock; + } elsif ( $data->{load} eq 'shard' ) { + if ( my $path = shift @shard_paths ) { + warn "retrieve $path ", -s $path; + my $shard = Storable::retrieve $path; + warn ">>>> [", $data->{port}, "] sending shard $path\n"; + Storable::store_fd( { path => $path, shard => $shard }, $sock ); + } else { + warn "no more shards for [", $data->{port}, "]\n"; + } + } else { + warn "UNKNOWN ",dump($data); } } }