--- trunk/experiments/protocol-v3/server.pl 2009/11/03 17:19:12 170 +++ trunk/lib/Sack/Server.pm 2009/12/24 13:06:25 254 @@ -1,37 +1,52 @@ #!/usr/bin/perl +package Sack::Server; + use warnings; use strict; +my $debug = 0; + use IO::Socket::INET; use IO::Select; use Data::Dump qw(dump); use Storable qw(); use File::Slurp; +use Cwd qw(abs_path); -my @cloud = qw(localhost tab.lan llin.lan); - -my $cloud_path = $ENV{CLOUD} || '/srv/Sack/etc/lib'; +use lib '/srv/Sack/lib'; +use Sack::Merge; +use Sack::Server::HTTP; +use Sack::Server::HTML; +use Sack::Server::Gnuplot; + +my @cloud; +my $cloud_path = $ENV{CLOUD} || "etc/cloud"; +die "start with: CLOUD=etc/cloud perl -I/srv/Sack/lib $0\n" unless -e $cloud_path; @cloud = read_file $cloud_path; @cloud = map { chomp $_; $_ } @cloud; warn "# cloud ",dump( @cloud ); my $listen_port = 4444; +my $http_port = 4480; -my $node_path = '/tmp/client.pl'; +my $node_path = abs_path $0; +$node_path =~ s{Server}{Client}; -my $lsn = IO::Socket::INET->new(Listen => 1, LocalPort => $listen_port, Reuse => 1) or die $!; +my $lsn = IO::Socket::INET->new(Listen => 1, LocalPort => $listen_port, Reuse => 1) or die "$listen_port $!"; +my $www = IO::Socket::INET->new(Listen => 1, LocalPort => $http_port, Reuse => 1) or die "$http_port $!"; my $sel = IO::Select->new($lsn); +$sel->add( $www ); my $info; sub info { my $port = shift; - push @{ $info->{$port} }, [ @_ ]; + push @{ $info->{node}->{$port} }, [ @_ ]; } -sub fork_node { +sub fork_ssh { my ( $port, $host ) = @_; if ( my $pid = fork ) { @@ -53,21 +68,203 @@ my $node_port = 4000; foreach my $host ( @cloud ) { - system "echo $node_path | cpio --create --dereference | ssh -T -F $cloud_path.ssh $host cpio --extract --make-directories --unconditional"; - fork_node( $node_port++, $host ); + system "find /srv/Sack/ | cpio --create --dereference | ssh -T -F $cloud_path.ssh $host cpio --extract --make-directories --unconditional"; + fork_ssh( $node_port++, $host ); } my $session; +sub to_all { + my $data = shift; + foreach my $port ( keys %{ $session->{port} } ) { + warn ">>>> [$port]\n" if $debug; + Storable::store_fd( $data, $session->{port}->{$port} ); + } +} + +our @shard_load_queue; +sub load_shard { + my $shard = shift @_ || return; + + warn "# load_shard $shard\n"; + + my @shards = glob "$shard/*"; + + if ( ! @shards ) { + warn "no shards for $shard\n"; + return; + } + + foreach my $s ( @shards ) { + if ( my $node = $info->{shard}->{$s} ) { + next if $node =~ m{^\d+$}; # not node number + } + $info->{shard}->{$s} = 'wait'; + push @shard_load_queue, $s; + warn "queued $s for loading\n"; + } + to_all { load => $shard }; +} + +sub run_view { + my ( $path ) = @_; + if ( ! -r $path ) { + warn "ERROR view $path: $!"; + return; + } + my $code = read_file $path; + Sack::Merge->clean; + delete( $info->{view} ); + delete( $info->{merge} ); + delete( $info->{shard} ); + to_all { code => $code, view => $path }; +}; + +our @responses; + while (1) { for my $sock ($sel->can_read(1)) { if ($sock == $lsn) { my $new = $lsn->accept; $sel->add($new); - $session->{$new} = $new->peerport; + $session->{peerport}->{ $new->peerport } = $new; warn "[socket] connect\n"; Storable::store_fd( { ping => 1 }, $new ); - info 0 => 'ping', $new->peerport; + } elsif ( $sock == $www ) { + my $client = $www->accept; + Sack::Server::HTTP::request( $client, sub { + my ( $send, $method, $param ) = @_; + + if ( $method =~ m{views} ) { + run_view $method; + print $send "HTTP/1.0 302 $method\r\nLocation: /\r\n\r\n"; + return 1; + } elsif ( $method =~ m{^/tmp/sack} ) { + load_shard $method; + print $send "HTTP/1.0 302 $method\r\nLocation: /\r\n\r\n"; + return 1; + } elsif ( $method =~ m{^/out/(.+)} ) { + print $send "HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\n"; + Sack::Server::HTML::send_out( $send, Sack::Merge->out, $1, $param ); + return 1; + } elsif ( $method =~ m{^/gnuplot} ) { + eval { + my $path = Sack::Server::Gnuplot::date( Sack::Merge->out, $param ); + if ( -e $path ) { + print $send "HTTP/1.0 200 OK\r\nContent-Type: image/png\r\n\r\n"; + open(my $fh, '<', $path) || die $path; + my $b; + while ( read($fh, $b, 4096) ) { + print $send $b; + } + return 1; + } else { + print $send "HTTP/1.0 404 no graph\r\n\r\n"; + return 1; + } + }; + warn "ERROR: $@" if $@; + } + + print $send "HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\n"; + + print $send qq| + + + +|; + + print $send qq|

Views

| + ; + + my $out = Sack::Merge->out; + if ( my $li = join("\n", map { qq|
  • $_| } keys %$out ) ) { + print $send qq|

    Results

    \n|; + } + + print $send qq|

    Nodes

    + |, join("\n", map { + my $class = join(' ', map { $_->[0] } @{ $info->{node}->{$_} }); + qq|$_|; + } sort keys %{ $info->{node} } ); + + print $send qq|

    Shards

    |; + + if ( $param->{info} ) { + print $send qq|hide info|; + print $send '
    ', dump($info), '
    ' + } else { + print $send qq|show info|; + } + return 1; + } ); } else { my $data = eval { Storable::fd_retrieve( $sock ) }; if ( $@ ) { @@ -76,13 +273,68 @@ $sel->remove($sock); $sock->close; } else { - warn "<<<< ", dump($data), $/; - if ( $data->{repl} ) { - my $response = { repl => $$, info => $info }; + warn "<<<< ", dump($data), $/ if $debug; + + if ( my $path = $data->{shard} ) { + $info->{shard}->{ $path } = $data->{port}; + # FIXME will need push for multiple copies of shards + } + + if ( my $repl = $data->{repl} ) { + my $response = { repl_pid => $$ }; + if ( $repl =~ m/ping/ ) { + to_all { ping => 1 }; + } elsif ( $repl =~ m/info/ ) { + $response->{info} = $info; + } elsif ( $repl =~ m{load\s*(\S+)?} ) { + load_shard $1; + } elsif ( $repl =~ m{view\s*(\S+)?} ) { + run_view $1; + } elsif ( $repl =~ m{out} ) { + my $out = Sack::Merge->out; + warn "out ",dump( $out ); + $response->{out} = $out; + } elsif ( $repl =~ m{clean} ) { + delete $info->{shard}; + to_all { clean => 1 }; + } elsif ( $repl eq 'exit' ) { + to_all { exit => 1 }; + sleep 1; + exit; + } elsif ( $repl eq '.' ) { + $response->{'.'} = [ @responses ]; + @responses = (); + } elsif ( $repl =~ m{(\w+)\s*(.+)?} ) { + to_all { $1 => $2 }; + } else { + $response->{error}->{repl} = $repl; + } Storable::store_fd( $response, $sock ); } elsif ( $data->{ping} ) { - $info->{_peer_port}->{$sock->peerport} = $data->{port}; - info $data->{port} => 'peer port', $sock->peerport; + my $port = $data->{port}; + info $port => 'ping', $port; + $session->{port}->{ $data->{port} } = $sock; + } elsif ( defined $data->{load} && $data->{load} eq 'shard' ) { + if ( my $path = shift @shard_load_queue ) { + $info->{shard}->{$path} = 'read'; + my $shard = Storable::retrieve $path; + $info->{shard}->{$path} = 'send'; + warn ">>>> [", $data->{port}, "] sending shard $path\n"; + Storable::store_fd( { path => $path, shard => $shard }, $sock ); + } else { + warn "no more shards for [", $data->{port}, "]\n"; + } + } elsif ( exists $data->{out} ) { + my $added = Sack::Merge->add( $data->{out} ) if defined $data->{out}; + $info->{merge}->{ $data->{view} }->{ $data->{port} } = $added; + $info->{view }->{ $data->{view} }->{ $data->{port} } = $data->{on_shard}; + # refresh shard allocation + $info->{shard}->{ $_ } = $data->{port} foreach keys %{ $data->{on_shard} }; + } elsif ( exists $data->{port} ) { + push @responses, $data; + warn "# ",dump($data),$/; + } else { + warn "UNKNOWN ",dump($data); } } }