--- trunk/bin/sack.pl 2009/09/22 16:02:49 20 +++ trunk/bin/sack.pl 2009/09/25 12:24:42 59 @@ -3,12 +3,14 @@ use warnings; use strict; +our $VERSION = '0.04'; + use Time::HiRes qw(time); use Data::Dump qw(dump); use File::Slurp; use Getopt::Long; use IO::Socket::INET; -use Storable qw/freeze thaw/; +use Storable qw/freeze thaw store/; my $debug = 0; @@ -16,7 +18,7 @@ my $limit = 5000; my $offset = 0; my @views; -my $listen; +my $port = 0; # interactive my @nodes; @@ -25,7 +27,7 @@ 'offset=i' => \$offset, 'limit=i' => \$limit, 'view=s' => \@views, - 'listen|port=i' => \$listen, + 'listen|port=i' => \$port, 'connect=s' => \@nodes, 'debug!' => \$debug, ) or die $!; @@ -33,29 +35,46 @@ my $t = time; +sub send_nodes; + our $prefix; -BEGIN { +sub BEGIN { $prefix = $0; - if ( $prefix =~ s{^./}{} ) { + if ( $prefix !~ m{^/} ) { chomp( my $pwd = `pwd` ); $prefix = "$pwd/$prefix"; } - $prefix =~ s{^(.*)/srv/Sack/bin.+$}{$1}; + $prefix =~ s{^(.*)/srv/Sack/.+$}{$1}; warn "# prefix $prefix"; + + $SIG{INT} = sub { + my $signame = shift; + send_nodes 'exit'; + #clean if $clean; # FIXME + die "SIG$signame"; + }; } +use lib "$prefix/srv/Sack/lib/"; +use Sack::Digest; +our $digest = Sack::Digest->new( port => $port, clean => 1 ); +sub digest { $digest->to_int($_[0]) } use lib "$prefix/srv/webpac2/lib/"; use WebPAC::Input::ISI; + +$WebPAC::Input::ISI::subfields = undef; # disable parsing of subfields + my $input = WebPAC::Input::ISI->new( path => "$prefix/$path", offset => $offset, limit => $limit, ); +our $num_records = $input->size; sub report { - my $description = shift; + my $description = join(' ',@_); my $dt = time - $t; printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt; $t = time; @@ -74,13 +93,14 @@ sub send_nodes { my $content = $#_ > 0 ? pop @_ : ''; # no content with just one argument! - my $header = length($content); + my $header = defined $content ? length($content) : 0; $header .= ' ' . join(' ', @_) if @_; foreach my $node ( @nodes ) { my $sock = IO::Socket::INET->new( - PeerAddr => $node, + PeerAddr => '127.0.0.1', + PeerPort => $node, Proto => 'tcp', ); @@ -89,7 +109,7 @@ next; } - print ">>>> $node $header\n"; + warn "[$port] >>>> $node $header\n"; print $sock "$header\n$content" || warn "can't send $header to $node: $!"; $connected->{$node} = $sock; @@ -101,12 +121,12 @@ my $sock = $connected->{$node}; if ( ! $sock ) { - warn "ERROR: lost connection to $node"; + warn "[$port] ERROR lost connection to $node"; delete $connected->{$node}; return; } chomp( my $size = <$sock> ); - warn "<<<< $node $size bytes\n"; + warn "[$port] <<<< $node $size bytes\n" if $debug || $size > 1024; my $data; read $sock, $data, $size; return $data; @@ -115,42 +135,75 @@ sub send_sock { my ( $sock, $data ) = @_; my $size = length $data; - warn ">>>> ", $sock->peerhost, " $size bytes"; + warn "[$port] >>>> $size bytes\n" if $debug || $size > 1024; print $sock "$size\n$data" || warn "can't send $size bytes to ", $sock->peerhost; } sub merge_out { - my $new = shift; + my ( $from_node, $new ) = @_; + + my $remote_digest = Sack::Digest->new( port => $from_node ); + my ( $local, $remote ) = ( 0, 0 ); + + my $tick = 0; + print STDERR "[$port] merge $from_node"; foreach my $k1 ( keys %$new ) { foreach my $k2 ( keys %{ $new->{$k1} } ) { - my $n = $new->{$k1}->{$k2}; - my $ref = ref $out->{$k1}->{$k2}; + my $n = delete $new->{$k1}->{$k2}; + if ( $k1 =~ m{#} ) { + die "ASSERT $k1 $k2" unless $k2 =~ m{^\d+$}; +#warn "XXX $k1 $k2"; + my $md5 = $remote_digest->{nr_md5}->[$k2] || warn "[$port] no2md5 $n not found in $from_node\n"; + if ( my $local_k2 = $digest->{md5_nr}->{$md5} ) { + $k2 = $local_k2; + $local++; + } else { + $k2 = $digest->to_int( $remote_digest->{md5}->{$md5} ); + $remote++; + } + } + + my $ref = ref $out->{$k1}->{$k2}; +#warn "XXXX $k1 $k2 $ref"; if ( ! defined $out->{$k1}->{$k2} ) { $out->{$k1}->{$k2} = $n; } elsif ( $k1 =~ m{\+} ) { # warn "## agregate $k1 $k2"; $out->{$k1}->{$k2} += $n; - } elsif ( $ref eq 'ARRAY' ) { - push @{ $out->{$k1}->{$k2} }, $n; + } elsif ( $ref eq 'ARRAY' ) { + if ( ref $n eq 'ARRAY' ) { + push @{ $out->{$k1}->{$k2} }, $_ foreach @$n; + } else { + push @{ $out->{$k1}->{$k2} }, $n; + } } elsif ( $ref eq '' ) { $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ]; } else { die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2}); } + + if ( $tick++ % 1000 == 0 ) { + print STDERR "."; + } elsif ( $tick % 10000 == 0 ) { + print STDERR $tick; + } } } + print STDERR "$tick\n"; + + warn "[$port] merge local $local remote $remote from $from_node\n"; warn "## merge out ", dump $out if $debug; } sub run_code { my ( $view, $code ) = @_; - warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n"; + warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n" if $debug; send_nodes view => $view => $code; @@ -162,26 +215,37 @@ foreach my $pos ( $offset + 1 .. $offset + $input->size ) { my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos ); if ( ! $rec ) { - warn "END at $pos"; + print STDERR "END @ $pos"; last; } eval "$code"; if ( $@ ) { - warn "ABORT [$pos] $@\n"; + warn "ABORT $pos $@\n"; last; } else { $affected++; } + + $pos % 10000 == 0 ? print STDERR $pos : + $pos % 1000 == 0 ? print STDERR "." : 0 ; }; - report "$affected affected records $view"; + report "\n[$port] RECS $affected $view"; warn "WARN no \$out defined!" unless defined $out; + $digest->sync; + if ( $connected ) { - warn "# get results from ", join(' ', keys %$connected ); - merge_out( thaw( get_node( $_ ) ) ) foreach keys %$connected; + foreach my $node ( keys %$connected ) { + warn "[$port] get_node $node\n"; + my $o = get_node $node; + my $s = length $o; + $o = thaw $o; + warn "[$port] merge $node $s bytes\n"; + merge_out $node => $o; + } } } @@ -198,47 +262,56 @@ run_code $view => $code; if ( defined $out ) { - my $dump = dump $out; - my $len = length $dump; my $path = $view; $path =~ s{views?/}{out/} || die "no view in $view"; - $path =~ s{\.pl}{}; - - print "OUT $view $offset/$limit $len bytes $path" - , ( $len < 10000 ? " \$out = $dump" : ' SAVED ONLY' ) - , "\n" - ; + $path =~ s{\.pl}{.storable}; unlink "$path.last" if -e "$path.last"; rename $path, "$path.last"; - write_file $path, $dump; - report "SAVE $path"; + + store $out => $path; + report "[$port] SAVE $path $offset-$limit", -s $path, "bytes"; + + if ( -s $path < 4096 ) { + print '$out = ', dump $digest->undigest_out($out); + } } } } -if ( $listen ) { + +sub info_tabs { + "$port\t$offset\t$limit\t$num_records\t$path\t" + . join("\t", map { + my $b = $_; + $b =~ s{^.+\.$port\.([^/]+)$}{$1}; + "$b " . -s $_ + } glob "/dev/shm/sack.$port.*" ); +} + + +if ( $port ) { my $sock = IO::Socket::INET->new( Listen => SOMAXCONN, -# LocalAddr => '0.0.0.0', - LocalPort => $listen, + LocalAddr => '127.0.0.1', + LocalPort => $port, Proto => 'tcp', Reuse => 1, ) or die $!; while (1) { - warn "NODE listen on $listen\n"; + warn "[$port] READY path: $path offset: $offset limit: $limit #recs: $num_records\n"; my $client = $sock->accept(); - warn "<<<< $listen connect from ", $client->peerhost, $/; + warn "[$port] <<<< connect from ", $client->peerhost, $/; my @header = split(/\s/, <$client>); - warn "# header ",dump @header; + warn "[$port] <<<< header ",dump(@header),$/; my $size = shift @header; @@ -249,48 +322,72 @@ run_code $header[1] => $content; send_sock $client => freeze $out; } elsif ( $header[0] eq 'info' ) { - my $info = "$listen\t$offset\t$limit\t$path"; - warn "info $info\n"; + my $info = info_tabs; + warn "[$port] info $info\n"; send_sock $client => $info; } elsif ( $header[0] eq 'exit' ) { - warn "exit $listen"; + warn "[$port] exit"; exit; } else { - warn "WARN $listen unknown"; + warn "[$port] UNKNOWN $header[0]"; } } } -run_views; +sub info { + send_nodes 'info' => $2; -while ( 1 ) { + my @info = ( + "port\toffset\tlimit\t#recs\tpath", + "----\t------\t-----\t-----\t----", + info_tabs, + ); + + push @info, get_node $_ foreach @nodes; + + print "[$port] INFO\n" + , join("\n", @info) + , "\n\n" ; - print "sack> "; - my $cmd = ; + return @info; +} - if ( $cmd =~ m{^(vi?|\\e|o(?:ut)?)}i ) { - system "vi out/*"; - } elsif ( $cmd =~ m{^i(nfo)?}i ) { - print "# nodes: ", join(' ',@nodes), $/; - - my @info = ( - "node\toffset\tlimit\tpath", - "----\t------\t-----\t----", - "here\t$offset\t$limit\t$path", - ); +info; +run_views; - send_nodes 'info'; - push @info, get_node $_ foreach @nodes; +while ( 1 ) { - print "$_\n" foreach @info; + print "sack> "; + chomp( my $cmd = ); - } elsif ( $cmd =~ m{^(q(uit)|e(xit))}i ) { + if ( $cmd =~ m{^(h|\?)} ) { + print << "__HELP__" +Sacks Lorry v$VERSION - path: $path offset: $offset limit: $limit + + View Run run views + VI \\e Output show output of last run + Info [\$VERSION] instrospect + Quit EXit shutdown + +__HELP__ + } elsif ( $cmd =~ m{^(vi|\\e|o)}i ) { + #system "vi out/*"; + $digest->sync; + system "bin/storableedit.pl", (glob('out/*.storable'))[0]; + } elsif ( $cmd =~ m{^i(?:nfo)?\s?(.+)?$}i ) { + info; + } elsif ( $cmd =~ m{^(q|e|x)}i ) { warn "# exit"; send_nodes 'exit'; exit; - } else { + } elsif ( $cmd =~ m{^(v|r)}i ) { run_views; + } elsif ( $cmd =~ m{^n(ode)?\s*(\d+)}i ) { + push @nodes, $1; + info; + } elsif ( $cmd ) { + warn "UNKNOWN ", dump $cmd; } }