--- trunk/bin/sack.pl 2009/09/22 10:40:32 14 +++ trunk/bin/sack.pl 2009/09/24 19:21:55 50 @@ -3,19 +3,22 @@ use warnings; use strict; +our $VERSION = '0.04'; + use Time::HiRes qw(time); use Data::Dump qw(dump); use File::Slurp; use Getopt::Long; use IO::Socket::INET; -use Storable qw/freeze thaw/; +use Storable qw/freeze thaw store/; +my $debug = 0; my $path = '/data/isi/full.txt'; my $limit = 5000; my $offset = 0; my @views; -my $listen; +my $port = 0; # interactive my @nodes; @@ -24,36 +27,54 @@ 'offset=i' => \$offset, 'limit=i' => \$limit, 'view=s' => \@views, - 'listen|port=i' => \$listen, + 'listen|port=i' => \$port, 'connect=s' => \@nodes, + 'debug!' => \$debug, ) or die $!; my $t = time; +sub send_nodes; + our $prefix; -BEGIN { +sub BEGIN { $prefix = $0; - if ( $prefix =~ s{^./}{} ) { + if ( $prefix !~ m{^/} ) { chomp( my $pwd = `pwd` ); $prefix = "$pwd/$prefix"; } - $prefix =~ s{^(.*)/srv/Sack/bin.+$}{$1}; + $prefix =~ s{^(.*)/srv/Sack/[\./]+bin.+$}{$1}; warn "# prefix $prefix"; + + $SIG{INT} = sub { + my $signame = shift; + send_nodes 'exit'; + #clean if $clean; # FIXME + die "SIG$signame"; + }; } +use lib "$prefix/srv/Sack/lib/"; +use Sack::Digest; +our $digest = Sack::Digest->new( port => $port, clean => 1 ); +sub digest { $digest->to_int($_[0]) } use lib "$prefix/srv/webpac2/lib/"; use WebPAC::Input::ISI; + +$WebPAC::Input::ISI::subfields = undef; # disable parsing of subfields + my $input = WebPAC::Input::ISI->new( path => "$prefix/$path", offset => $offset, limit => $limit, ); +our $num_records = $input->size; sub report { - my $description = shift; + my $description = join(' ',@_); my $dt = time - $t; printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt; $t = time; @@ -71,8 +92,8 @@ our $connected; sub send_nodes { - my $content = pop @_; - my $header = length($content); + my $content = $#_ > 0 ? pop @_ : ''; # no content with just one argument! + my $header = defined $content ? length($content) : 0; $header .= ' ' . join(' ', @_) if @_; foreach my $node ( @nodes ) { @@ -80,35 +101,86 @@ my $sock = IO::Socket::INET->new( PeerAddr => $node, Proto => 'tcp', - ) or die "can't connect to $node - $!"; + ); - print ">>>> $node $header\n"; + if ( ! $sock ) { + warn "can't connect to $node - $!"; # FIXME die? + next; + } + warn "[$port] >>>> $node $header\n"; print $sock "$header\n$content" || warn "can't send $header to $node: $!"; $connected->{$node} = $sock; } } +sub get_node { + my $node = shift; + + my $sock = $connected->{$node}; + if ( ! $sock ) { + warn "[$port] ERROR lost connection to $node"; + delete $connected->{$node}; + return; + } + chomp( my $size = <$sock> ); + warn "[$port] <<<< $node $size bytes\n"; + my $data; + read $sock, $data, $size; + return $data; +} + +sub send_sock { + my ( $sock, $data ) = @_; + my $size = length $data; + warn "[$port] >>>> ", $sock->peerhost, " $size bytes\n"; + print $sock "$size\n$data" || warn "can't send $size bytes to ", $sock->peerhost; +} + sub merge_out { - my $new = shift; + my ( $from_node, $new ) = @_; + + warn "### merge $from_node"; - warn "## merge $new\n"; + my $from_port = $from_node; + $from_port =~ s{.+:(\d+)$}{$1}; + + my $remote_digest = Sack::Digest->new( port => $from_port ); + my ( $local, $remote ) = ( 0, 0 ); foreach my $k1 ( keys %$new ) { foreach my $k2 ( keys %{ $new->{$k1} } ) { - my $n = $new->{$k1}->{$k2}; - my $ref = ref $out->{$k1}->{$k2}; + my $n = delete $new->{$k1}->{$k2}; + + if ( $k1 =~ m{#} ) { + die "ASSERT $k1 $k2" unless $k2 =~ m{^\d+$}; +#warn "XXX $k1 $k2"; + my $md5 = $remote_digest->{nr_md5}->[$k2] || warn "[$port] no2md5 $n not found in $from_port\n"; + if ( my $local_k2 = $digest->{md5_nr}->{$md5} ) { + $k2 = $local_k2; + $local++; + } else { + $k2 = $digest->to_int( $remote_digest->{md5}->{$md5} ); + $remote++; + } + } + my $ref = ref $out->{$k1}->{$k2}; +#warn "XXXX $k1 $k2 $ref"; if ( ! defined $out->{$k1}->{$k2} ) { $out->{$k1}->{$k2} = $n; } elsif ( $k1 =~ m{\+} ) { - warn "# agregate $k1 $k2"; +# warn "## agregate $k1 $k2"; $out->{$k1}->{$k2} += $n; - } elsif ( $ref eq 'ARRAY' ) { - push @{ $out->{$k1}->{$k2} }, $n; + } elsif ( $ref eq 'ARRAY' ) { + if ( ref $n eq 'ARRAY' ) { + push @{ $out->{$k1}->{$k2} }, $_ foreach @$n; + } else { + push @{ $out->{$k1}->{$k2} }, $n; + } } elsif ( $ref eq '' ) { $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ]; } else { @@ -117,13 +189,14 @@ } } - warn "## merge out ", dump $out; + warn "[$port] merge local $local remote $remote from $from_port\n"; + warn "## merge out ", dump $out if $debug; } sub run_code { my ( $view, $code ) = @_; - warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n"; + warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n" if $debug; send_nodes view => $view => $code; @@ -135,32 +208,36 @@ foreach my $pos ( $offset + 1 .. $offset + $input->size ) { my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos ); if ( ! $rec ) { - warn "END at $pos"; + print STDERR "END @ $pos"; last; } eval "$code"; if ( $@ ) { - warn "ERROR [$pos] $@\n"; + warn "ABORT $pos $@\n"; + last; } else { $affected++; } + + $pos % 10000 == 0 ? print STDERR $pos : + $pos % 1000 == 0 ? print STDERR "." : 0 ; }; - report "$affected affected records $view"; + report "\n[$port] RECS $affected $view"; warn "WARN no \$out defined!" unless defined $out; - if ( $connected ) { - warn "# get results from ", join(' ', keys %$connected ); + $digest->sync; + if ( $connected ) { foreach my $node ( keys %$connected ) { - my $sock = $connected->{$node}; - my $size = <$sock>; - warn "<<<< $node $size bytes\n"; - my $part; - read $sock, $part, $size; - merge_out( thaw $part ); + warn "[$port] get_node $node\n"; + my $o = get_node $node; + my $s = length $o; + $o = thaw $o; + warn "[$port] merge $node $s bytes\n"; + merge_out $node => $o; } } } @@ -178,47 +255,56 @@ run_code $view => $code; if ( defined $out ) { - my $dump = dump $out; - my $len = length $dump; my $path = $view; $path =~ s{views?/}{out/} || die "no view in $view"; - $path =~ s{\.pl}{}; - - print "OUT $view $offset/$limit $len bytes $path" - , ( $len < 10000 ? " \$out = $dump" : ' SAVED ONLY' ) - , "\n" - ; + $path =~ s{\.pl}{.storable}; unlink "$path.last" if -e "$path.last"; rename $path, "$path.last"; - write_file $path, $dump; - report "SAVE $path"; + + store $out => $path; + report "[$port] SAVE $path $offset-$limit", -s $path, "bytes"; + + if ( -s $path < 4096 ) { + print '$out = ', dump $digest->undigest_out($out); + } } } } -if ( $listen ) { + +sub info_tabs { + "$port\t$offset\t$limit\t$num_records\t$path\t" + . join("\t", map { + my $b = $_; + $b =~ s{^.+\.$port\.([^/]+)$}{$1}; + "$b " . -s $_ + } glob "/dev/shm/sack.$port.*" ); +} + + +if ( $port ) { my $sock = IO::Socket::INET->new( Listen => SOMAXCONN, -# LocalAddr => '0.0.0.0', - LocalPort => $listen, + LocalAddr => '127.0.0.1', + LocalPort => $port, Proto => 'tcp', Reuse => 1, ) or die $!; while (1) { - warn "NODE listen on $listen\n"; + warn "[$port] READY path: $path offset: $offset limit: $limit #recs: $num_records\n"; my $client = $sock->accept(); - warn "<<<< connect from ", $client->peerhost, $/; + warn "[$port] <<<< connect from ", $client->peerhost, $/; my @header = split(/\s/, <$client>); - warn "# header ",dump @header; + warn "[$port] <<<< header ",dump(@header),$/; my $size = shift @header; @@ -227,32 +313,70 @@ if ( $header[0] eq 'view' ) { run_code $header[1] => $content; - - my $frozen = freeze $out; - my $size = length $frozen; - warn ">>>> $size bytes"; - print $client "$size\n$frozen"; - + send_sock $client => freeze $out; + } elsif ( $header[0] eq 'info' ) { + my $info = info_tabs; + warn "[$port] info $info\n"; + send_sock $client => $info; + } elsif ( $header[0] eq 'exit' ) { + warn "[$port] exit"; + exit; } else { - warn "WARN unknown"; + warn "[$port] UNKNOWN $header[0]"; } } } +sub info { + send_nodes 'info' => $2; + + my @info = ( + "port\toffset\tlimit\t#recs\tpath", + "----\t------\t-----\t-----\t----", + info_tabs, + ); + + push @info, get_node $_ foreach @nodes; + + print "[$port] INFO\n" + , join("\n", @info) + , "\n\n" ; + + return @info; +} + +info; run_views; while ( 1 ) { print "sack> "; - my $cmd = ; + chomp( my $cmd = ); - if ( $cmd =~ m{^(vi?|\\e|o(?:ut)?)}i ) { - system "vi out/*"; - } elsif ( $cmd =~ m{^i(nfo)?}i ) { - print "nodes: ", dump @nodes, $/; - } else { + if ( $cmd =~ m{^(h|\?)} ) { + print << "__HELP__" +Sacks Lorry v$VERSION - path: $path offset: $offset limit: $limit + + View Run run views + VI \\e Output show output of last run + Info [\$VERSION] instrospect + Quit EXit shutdown + +__HELP__ + } elsif ( $cmd =~ m{^(vi|\\e|o)}i ) { + #system "vi out/*"; + system "bin/storableedit.pl", (glob('out/*.storable'))[0]; + } elsif ( $cmd =~ m{^i(?:nfo)?\s?(.+)?$}i ) { + info; + } elsif ( $cmd =~ m{^(q|e|x)}i ) { + warn "# exit"; + send_nodes 'exit'; + exit; + } elsif ( $cmd =~ m{^(v|r)}i ) { run_views; + } elsif ( $cmd ) { + warn "UNKNOWN ", dump $cmd; } }