--- trunk/bin/sack.pl 2009/09/22 15:14:00 19 +++ trunk/bin/sack.pl 2009/09/22 21:38:31 25 @@ -3,6 +3,8 @@ use warnings; use strict; +our $VERSION = '0.01'; + use Time::HiRes qw(time); use Data::Dump qw(dump); use File::Slurp; @@ -16,7 +18,7 @@ my $limit = 5000; my $offset = 0; my @views; -my $listen; +my $listen = 0; # off my @nodes; @@ -33,8 +35,10 @@ my $t = time; +sub send_nodes; + our $prefix; -BEGIN { +sub BEGIN { $prefix = $0; if ( $prefix =~ s{^./}{} ) { chomp( my $pwd = `pwd` ); @@ -42,17 +46,27 @@ } $prefix =~ s{^(.*)/srv/Sack/bin.+$}{$1}; warn "# prefix $prefix"; + + $SIG{INT} = sub { + my $signame = shift; + send_nodes 'exit'; + die "SIG$signame"; + }; } use lib "$prefix/srv/webpac2/lib/"; use WebPAC::Input::ISI; + +$WebPAC::Input::ISI::subfields = undef; # disable parsing of subfields + my $input = WebPAC::Input::ISI->new( path => "$prefix/$path", offset => $offset, limit => $limit, ); +our $num_records = $input->size; sub report { my $description = shift; @@ -74,7 +88,7 @@ sub send_nodes { my $content = $#_ > 0 ? pop @_ : ''; # no content with just one argument! - my $header = length($content); + my $header = defined $content ? length($content) : 0; $header .= ' ' . join(' ', @_) if @_; foreach my $node ( @nodes ) { @@ -89,7 +103,7 @@ next; } - print ">>>> $node $header\n"; + print ">>>> $listen $node $header\n"; print $sock "$header\n$content" || warn "can't send $header to $node: $!"; $connected->{$node} = $sock; @@ -106,7 +120,7 @@ return; } chomp( my $size = <$sock> ); - warn "<<<< $node $size bytes\n"; + warn "<<<< $listen $node $size bytes\n"; my $data; read $sock, $data, $size; return $data; @@ -115,7 +129,7 @@ sub send_sock { my ( $sock, $data ) = @_; my $size = length $data; - warn ">>>> ", $sock->peerhost, " $size bytes"; + warn ">>>> $listen ", $sock->peerhost, " $size bytes"; print $sock "$size\n$data" || warn "can't send $size bytes to ", $sock->peerhost; } @@ -134,8 +148,12 @@ } elsif ( $k1 =~ m{\+} ) { # warn "## agregate $k1 $k2"; $out->{$k1}->{$k2} += $n; - } elsif ( $ref eq 'ARRAY' ) { - push @{ $out->{$k1}->{$k2} }, $n; + } elsif ( $ref eq 'ARRAY' ) { + if ( ref $n eq 'ARRAY' ) { + push @{ $out->{$k1}->{$k2} }, $_ foreach @$n; + } else { + push @{ $out->{$k1}->{$k2} }, $n; + } } elsif ( $ref eq '' ) { $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ]; } else { @@ -150,7 +168,7 @@ sub run_code { my ( $view, $code ) = @_; - warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n"; + warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n" if $debug; send_nodes view => $view => $code; @@ -168,7 +186,8 @@ eval "$code"; if ( $@ ) { - warn "ERROR [$pos] $@\n"; + warn "ABORT [$pos] $@\n"; + last; } else { $affected++; } @@ -179,8 +198,14 @@ warn "WARN no \$out defined!" unless defined $out; if ( $connected ) { - warn "# get results from ", join(' ', keys %$connected ); - merge_out( thaw( get_node( $_ ) ) ) foreach keys %$connected; + foreach my $node ( keys %$connected ) { + warn "# $listen get_node $node\n"; + my $o = get_node $node; + my $s = length $o; + $o = thaw $o; + warn "# $listen merge $s bytes\n"; + merge_out $o; + } } } @@ -222,7 +247,7 @@ if ( $listen ) { my $sock = IO::Socket::INET->new( Listen => SOMAXCONN, -# LocalAddr => '0.0.0.0', + LocalAddr => '127.0.0.1', LocalPort => $listen, Proto => 'tcp', Reuse => 1, @@ -230,14 +255,14 @@ while (1) { - warn "NODE listen on $listen\n"; + warn "NODE $listen ready - path: $path offset: $offset limit: $limit #recs: $num_records\n"; my $client = $sock->accept(); warn "<<<< $listen connect from ", $client->peerhost, $/; my @header = split(/\s/, <$client>); - warn "# header ",dump @header; + warn "<<<< $listen header ",dump(@header),$/; my $size = shift @header; @@ -248,7 +273,8 @@ run_code $header[1] => $content; send_sock $client => freeze $out; } elsif ( $header[0] eq 'info' ) { - my $info = "$listen\t$offset\t$limit\t$path"; + my $info = "$listen\t$offset\t$limit\t$num_records\t$path"; + $info .= "\t" . eval $header[1] if $header[1]; warn "info $info\n"; send_sock $client => $info; } elsif ( $header[0] eq 'exit' ) { @@ -266,30 +292,43 @@ while ( 1 ) { print "sack> "; - my $cmd = ; + chomp( my $cmd = ); - if ( $cmd =~ m{^(vi?|\\e|o(?:ut)?)}i ) { + if ( $cmd =~ m{^(h|\?)} ) { + print << "__HELP__" +Sacks Lorry v$VERSION - path: $path offset: $offset limit: $limit + + View Run run views + VI \\e Output show output of last run + Info [\$VERSION] instrospect + Quit EXit shutdown + +__HELP__ + } elsif ( $cmd =~ m{^(vi|\\e|o)}i ) { system "vi out/*"; - } elsif ( $cmd =~ m{^i(nfo)?}i ) { + } elsif ( $cmd =~ m{^i(?:nfo)?\s?(.+)?$}i ) { print "# nodes: ", join(' ',@nodes), $/; + send_nodes 'info' => $2; + my @info = ( - "node\toffset\tlimit\tpath", - "----\t------\t-----\t----", - "here\t$offset\t$limit\t$path", + "node\toffset\tlimit\t#recs\tpath", + "----\t------\t-----\t-----\t----", + "0\t$offset\t$limit\t$num_records\t$path", ); - send_nodes 'info'; push @info, get_node $_ foreach @nodes; print "$_\n" foreach @info; - } elsif ( $cmd =~ m{^(q(uit)|e(xit))}i ) { + } elsif ( $cmd =~ m{^(q|e|x)}i ) { warn "# exit"; send_nodes 'exit'; exit; - } else { + } elsif ( $cmd =~ m{^(v|r)}i ) { run_views; + } elsif ( $cmd ) { + warn "UNKNOWN ", dump $cmd; } }