--- trunk/lib/Sack/Server.pm 2009/11/08 13:40:58 188 +++ trunk/lib/Sack/Server.pm 2010/02/01 22:03:02 264 @@ -5,6 +5,8 @@ use warnings; use strict; +my $debug = 0; + use IO::Socket::INET; use IO::Select; @@ -13,25 +15,35 @@ use File::Slurp; use Cwd qw(abs_path); +use lib '/srv/Sack/lib'; +use Sack::Merge; +use Sack::Server::HTTP; +use Sack::Server::HTML; +use Sack::Server::Gnuplot; + my @cloud; -my $cloud_path = $ENV{CLOUD} || die "start with: CLOUD=etc/cloud perl -I/srv/Sack/lib $0\n"; +my $cloud_path = $ENV{CLOUD} || "etc/cloud"; +die "start with: CLOUD=etc/cloud perl -I/srv/Sack/lib $0\n" unless -e $cloud_path; @cloud = read_file $cloud_path; @cloud = map { chomp $_; $_ } @cloud; warn "# cloud ",dump( @cloud ); my $listen_port = 4444; +my $http_port = 4480; my $node_path = abs_path $0; $node_path =~ s{Server}{Client}; -my $lsn = IO::Socket::INET->new(Listen => 1, LocalPort => $listen_port, Reuse => 1) or die $!; +my $lsn = IO::Socket::INET->new(Listen => 1, LocalPort => $listen_port, Reuse => 1) or die "$listen_port $!"; +my $www = IO::Socket::INET->new(Listen => 1, LocalPort => $http_port, Reuse => 1) or die "$http_port $!"; my $sel = IO::Select->new($lsn); +$sel->add( $www ); my $info; sub info { my $port = shift; - push @{ $info->{$port} }, [ @_ ]; + push @{ $info->{node}->{$port} }, [ @_ ]; } sub fork_ssh { @@ -62,14 +74,60 @@ my $session; +sub all_ports { keys %{ $session->{port} } } + sub to_all { my $data = shift; - foreach my $port ( keys %{ $session->{port} } ) { - warn ">>>> [$port]\n"; + foreach my $port ( all_ports ) { + warn ">>>> [$port]\n" if $debug; Storable::store_fd( $data, $session->{port}->{$port} ); } } +our @shard_load_queue; +sub load_shard { + my $shard = shift @_ || return; + + warn "# load_shard $shard\n"; + + $shard .= '/*' if -d $shard; + + my @shards = glob $shard; + + if ( ! @shards ) { + warn "no shards for $shard\n"; + return; + } + + foreach my $s ( @shards ) { + if ( my $node = $info->{shard}->{$s} ) { + next if $node =~ m{^\d+$}; # not node number + } + $info->{shard}->{$s} = 'wait'; + push @shard_load_queue, $s; + warn "queued $s for loading\n"; + } + to_all { load => $shard }; + $info->{pending}->{$_} = 'load' foreach all_ports; +} + +sub run_view { + my ( $path ) = @_; + if ( ! -r $path ) { + warn "ERROR view $path: $!"; + return; + } + my $code = read_file $path; + Sack::Merge->clean; + delete( $info->{view} ); + delete( $info->{merge} ); + delete( $info->{shard} ); + $info->{pending}->{$_} = 'view' foreach all_ports; + to_all { code => $code, view => $path }; +}; + +our @responses; + while (1) { for my $sock ($sel->can_read(1)) { if ($sock == $lsn) { @@ -78,7 +136,163 @@ $session->{peerport}->{ $new->peerport } = $new; warn "[socket] connect\n"; Storable::store_fd( { ping => 1 }, $new ); - info 0 => 'ping', $new->peerport; + } elsif ( $sock == $www ) { + my $client = $www->accept; + Sack::Server::HTTP::request( $client, sub { + my ( $send, $method, $param ) = @_; + + if ( $method =~ m{views} ) { + run_view $method; + print $send "HTTP/1.0 302 $method\r\nLocation: /\r\n\r\n"; + return 1; + } elsif ( $method =~ m{^/tmp/sack} ) { + load_shard $method; + print $send "HTTP/1.0 302 $method\r\nLocation: /\r\n\r\n"; + return 1; + } elsif ( $method =~ m{^/out/(.+)} ) { + print $send "HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\n"; + Sack::Server::HTML::send_out( $send, Sack::Merge->out, $1, $param ); + return 1; + } elsif ( $method =~ m{^/gnuplot} ) { + eval { + my $path = Sack::Server::Gnuplot::date( Sack::Merge->out, $param ); + if ( -e $path ) { + print $send "HTTP/1.0 200 OK\r\nContent-Type: image/png\r\n\r\n"; + open(my $fh, '<', $path) || die $path; + my $b; + while ( read($fh, $b, 4096) ) { + print $send $b; + } + return 1; + } else { + print $send "HTTP/1.0 404 no graph\r\n\r\n"; + return 1; + } + }; + warn "ERROR: $@" if $@; + } + + my $refresh = $param->{refresh}; + $refresh = 1 if keys %{ $info->{pending} }; + + print $send "HTTP/1.0 200 OK\r\nContent-Type: text/html" + , ( $refresh ? "\r\nRefresh: $refresh" : '' ) + , "\r\n\r\n" + , ( $refresh ? qq|$refresh| : '' ) + ; + + print $send qq| + + + +|; + + my $out = Sack::Merge->out; + if ( my $li = join("\n", map { qq|
  • $_| } keys %$out ) ) { + print $send qq|

    Results

    \n|; + } + + print $send qq|

    Views

    | + ; + + print $send qq|

    Nodes

    |; + foreach my $node ( keys %{ $info->{node} } ) { + my $class = join(' ' + , map { $_->[0] } @{ $info->{node}->{$node} } + ); + $class .= ' ' . ( $info->{pending}->{$node} || 'ready' ); + print $send qq|$node\n|; + } + + print $send qq|

    Shards

    |; + + if ( $param->{info} ) { + print $send qq|hide info|; + print $send '
    ', dump($info), '
    ' + } else { + print $send qq|show info|; + } + return 1; + } ); } else { my $data = eval { Storable::fd_retrieve( $sock ) }; if ( $@ ) { @@ -87,19 +301,70 @@ $sel->remove($sock); $sock->close; } else { - warn "<<<< ", dump($data), $/; - if ( $data->{repl} ) { - my $response = { repl => $$ }; - if ( $data->{repl} =~ m/ping/ ) { + warn "<<<< ", dump($data), $/ if $debug; + + if ( my $path = $data->{shard} ) { + $info->{shard}->{ $path } = $data->{port}; + # FIXME will need push for multiple copies of shards + } + + if ( my $repl = $data->{repl} ) { + my $response = { repl_pid => $$ }; + if ( $repl =~ m/ping/ ) { to_all { ping => 1 }; - } elsif ( $data->{repl} =~ m/info/ ) { + } elsif ( $repl =~ m/info/ ) { $response->{info} = $info; + } elsif ( $repl =~ m{load\s*(\S+)?} ) { + load_shard $1; + } elsif ( $repl =~ m{view\s*(\S+)?} ) { + run_view $1; + } elsif ( $repl =~ m{out} ) { + my $out = Sack::Merge->out; + warn "out ",dump( $out ); + $response->{out} = $out; + } elsif ( $repl =~ m{clean} ) { + delete $info->{shard}; + to_all { clean => 1 }; + } elsif ( $repl eq 'exit' ) { + to_all { exit => 1 }; + sleep 1; + exit; + } elsif ( $repl eq '.' ) { + $response->{'.'} = [ @responses ]; + @responses = (); + } elsif ( $repl =~ m{(\w+)\s*(.+)?} ) { + to_all { $1 => $2 }; + } else { + $response->{error}->{repl} = $repl; } Storable::store_fd( $response, $sock ); } elsif ( $data->{ping} ) { my $port = $data->{port}; info $port => 'ping', $port; $session->{port}->{ $data->{port} } = $sock; + } elsif ( defined $data->{load} && $data->{load} eq 'shard' ) { + if ( my $path = shift @shard_load_queue ) { + $info->{shard}->{$path} = 'read'; + my $shard = Storable::retrieve $path; + $info->{shard}->{$path} = 'send'; + warn ">>>> [", $data->{port}, "] sending shard $path\n"; + Storable::store_fd( { path => $path, shard => $shard }, $sock ); + } else { + warn "no more shards for [", $data->{port}, "]\n"; + delete $info->{pending}->{ $data->{port} }; + } + } elsif ( exists $data->{out} ) { + my $added = Sack::Merge->add( $data->{out} ) if defined $data->{out}; + $info->{merge}->{ $data->{view} }->{ $data->{port} } = $added; + $info->{view }->{ $data->{view} }->{ $data->{port} } = $data->{on_shard}; + # refresh shard allocation + $info->{shard}->{ $_ } = $data->{port} foreach keys %{ $data->{on_shard} }; + delete $info->{pending}->{ $data->{port} }; + } elsif ( exists $data->{port} ) { + push @responses, $data; + warn "# ",dump($data),$/; + } else { + warn "UNKNOWN ",dump($data); } } }