/[Sack]/trunk/bin/sack.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/bin/sack.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 76 by dpavlin, Fri Sep 25 21:12:46 2009 UTC revision 87 by dpavlin, Wed Sep 30 19:30:45 2009 UTC
# Line 3  Line 3 
3  use warnings;  use warnings;
4  use strict;  use strict;
5    
6  our $VERSION = '0.05';  our $VERSION = '0.06';
7    
8  use Time::HiRes qw(time);  use Time::HiRes qw(time);
9  use Data::Dump qw(dump);  use Data::Dump qw(dump);
# Line 53  sub BEGIN { Line 53  sub BEGIN {
53                  #clean if $clean;       # FIXME                  #clean if $clean;       # FIXME
54                  die "SIG$signame";                  die "SIG$signame";
55          };          };
56    
57            sub port2color {
58                    my $port = shift;
59                    return "\e[1m0\e[0m" if $port == 0;
60    
61                    my $c = ( $port % 6 ) + 31;
62                    return "\e[${c}m$port\e[0m";
63            }
64    
65            $SIG{__WARN__} = sub {
66                    return unless @_;
67                    my $msg = join('', @_);
68                    if ( $msg !~ m{[\n\r]$} ) {
69                            my @loc = caller(1);
70                            $msg .= " in $loc[1] +$loc[2]\n" if @loc;
71                    }
72                    $msg =~ s{\[(0|\d\d\d\d)\]}{ '[' . port2color($1) . ']' }eg;
73                    print STDERR $msg;
74                    return 1;
75            };
76    
77  }  }
78    
79    our $digest_module = 'Sack::Digest::' . ( $ENV{SACK_DIGEST} || 'BerkeleyDB' );
80  use lib "$prefix/srv/Sack/lib/";  use lib "$prefix/srv/Sack/lib/";
81  use Sack::Digest;  my $digest_path = $digest_module;
82  our $digest = Sack::Digest->new( port => $port, clean => 1 );  $digest_path =~ s{::}{/}g;
83    require "lib/$digest_path.pm";
84    
85    our $digest = $digest_module->new( port => $port, clean => 1 );
86  sub digest { $digest->to_int($_[0]) }  sub digest { $digest->to_int($_[0]) }
87    
88    warn "[$port] using $digest_module\n";
89    
90  use lib "$prefix/srv/webpac2/lib/";  use lib "$prefix/srv/webpac2/lib/";
91  use WebPAC::Input::ISI;  use WebPAC::Input::ISI;
92    
# Line 78  sub report { Line 105  sub report {
105          my $description = join(' ',@_);          my $description = join(' ',@_);
106          my $dt = time - $t;          my $dt = time - $t;
107          my $report = [ $description, $dt, $input->size / $dt ];          my $report = [ $description, $dt, $input->size / $dt ];
108          printf "[$port] %s in %1.4fs %.2f/s\n", @$report;          warn sprintf "[$port] %s in %1.4fs %.2f/s\n", @$report;
109          push @reports, $report;          push @reports, $report;
110          $t = time;          $t = time;
111  }  }
# Line 164  sub pull_node_file { Line 191  sub pull_node_file {
191    
192          my $size = <$sock>;          my $size = <$sock>;
193          chomp($size);          chomp($size);
194          warn "[$port] pull_node_file $node $file $size bytes\n";          warn "[$port] pull_node_file [$node] $file $size bytes\n";
195    
196          my $block = 4096;          my $block = 4096;
197          my $buff  = ' ' x $block;          my $buff  = ' ' x $block;
# Line 176  sub pull_node_file { Line 203  sub pull_node_file {
203          close($fh);          close($fh);
204  }  }
205    
206    my $merge_digest_mapping;
207    
208  sub merge_out {  sub merge_out {
209          my ( $from_node, $new ) = @_;          my ( $from_node, $new ) = @_;
210    
# Line 188  sub merge_out { Line 217  sub merge_out {
217          my ( $local, $remote ) = ( 0, 0 );          my ( $local, $remote ) = ( 0, 0 );
218    
219          my $tick = 0;          my $tick = 0;
220          print STDERR "[$port] merge [$from_node]";          warn "[$port] merge [$from_node]";
221    
222          my $missing;          my $missing;
223    
# Line 201  sub merge_out { Line 230  sub merge_out {
230                          if ( $k1 =~ m{#} ) {                          if ( $k1 =~ m{#} ) {
231                                  die "ASSERT $k1 $k2" unless $k2 =~ m{^\d+$};                                  die "ASSERT $k1 $k2" unless $k2 =~ m{^\d+$};
232  #warn "XXX $k1 $k2";  #warn "XXX $k1 $k2";
                                 my $md5 = $remote_digest->{nr_md5}->[$k2];  
233    
234                                  if ( ! $md5 ) {                                  if ( defined $merge_digest_mapping->{$from_node}->[ $k2 ] ) {
235                                          $missing->{nr_md5}->{$from_node}++; # FIXME die?                                          $k2 = $merge_digest_mapping->{$from_node}->[ $k2 ];
                                         next;  
                                 }  
   
                                 if ( my $local_k2 = $digest->{md5_nr}->{$md5} ) {  
                                         $k2 = $local_k2;  
                                         $local++;  
                                 } elsif ( my $full = $remote_digest->{md5}->{$md5} ) {  
                                         $k2 = $digest->to_int( $remote_digest->{md5}->{$md5} );  
                                         $remote++;  
236                                  } else {                                  } else {
237                                          $missing->{md5}->{$from_node}++;  
238                                            my $md5 = $remote_digest->{nr_md5}->[$k2];
239    
240                                            if ( ! $md5 ) {
241                                                    $missing->{nr_md5}->{$from_node}++; # FIXME die?
242                                                    next;
243                                            }
244    
245                                            my $local_k2;
246    
247                                            if ( $local_k2 = $digest->{md5_nr}->{$md5} ) {
248                                                    $local++;
249                                            } elsif ( my $full = $remote_digest->{md5}->{$md5} ) {
250                                                    $local_k2 = $digest->to_int( $remote_digest->{md5}->{$md5} );
251                                                    $remote++;
252                                            } else {
253                                                    $missing->{md5}->{$from_node}++;
254                                            }
255    
256                                            $k2 = $merge_digest_mapping->{$from_node}->[ $k2 ] = $local_k2;
257    
258                                  }                                  }
259                          }                          }
260    
# Line 247  sub merge_out { Line 286  sub merge_out {
286          }          }
287    
288          $t_merge = time - $t_merge;          $t_merge = time - $t_merge;
289          printf STDERR "%d in %.4fs %.2f/s local %.1f%% %d/%d\n", $tick, $t_merge, $tick / $t_merge, $local * 100 / $tick, $local, $remote;          my $digests = $local + $remote;
290          push @reports, [ "$tick merged $from_node", $t_merge, $tick / $t_merge ];          warn sprintf "\n[$port] merge %d in %.4fs %.2f/s digests local: %.1f%% %d/%d\n", $tick, $t_merge, $digests / $t_merge, $local * 100 / ( $digests || 1 ), $local, $remote;
291            push @reports, [ "$tick merged [$from_node]", $t_merge, $digests / $t_merge ];
292    
293          warn "[$port] missing ", dump $missing if $missing;          warn "[$port] missing ", dump $missing if $missing;
294    
295          warn "## merge out ", dump $out if $debug;          warn "## merge out ", dump $out if $debug;
296  }  }
297    
298    our $rec;
299    
300  sub run_code {  sub run_code {
301          my ( $view, $code ) = @_;          my ( $view, $code ) = @_;
302    
# Line 267  sub run_code { Line 309  sub run_code {
309          my $affected = 0;          my $affected = 0;
310          $t = time;          $t = time;
311    
312            my $coderef = eval "sub { $code }";
313            if ( $@ ) {
314                    warn "ABORT code: $@";
315                    return;
316            }
317    
318          foreach my $pos ( $offset + 1 .. $offset + $input->size ) {          foreach my $pos ( $offset + 1 .. $offset + $input->size ) {
319                  my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );                  $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );
320                  if ( ! $rec ) {                  if ( ! $rec ) {
321                          print STDERR "END @ $pos";                          print STDERR "END @ $pos";
322                          last;                          last;
323                  }                  }
324    
325                  eval "$code";                  eval { $coderef->() };
326                  if ( $@ ) {                  if ( $@ ) {
327                          warn "ABORT $pos $@\n";                          warn "ABORT $pos $@\n";
328                          last;                          last;
# Line 340  sub run_views { Line 388  sub run_views {
388    
389    
390  sub info_tabs {  sub info_tabs {
391          "$port\t$offset\t$limit\t$num_records\t$path\t"          my $port_col = port2color($port);
392            "$port_col\t$offset\t$limit\t$num_records\t$path\t"
393          . join("\t", map {          . join("\t", map {
394                  my $b = $_;                  my $b = $_;
395                  $b =~ s{^.+\.$port\.([^/]+)$}{$1};                  $b =~ s{^.+\.$port\.([^/]+)$}{$1};
# Line 354  if ( $port ) { Line 403  if ( $port ) {
403          my $pid_path = "/tmp/sack.$port.pid";          my $pid_path = "/tmp/sack.$port.pid";
404          if ( -e $pid_path ) {          if ( -e $pid_path ) {
405                  my $pid = read_file $pid_path;                  my $pid = read_file $pid_path;
406                  kill 9, $pid && warn "[$port] kill old $pid";                  kill 9, $pid && warn "[$port] kill old $pid\n";
407          }          }
408          write_file $pid_path, $$;          write_file $pid_path, $$;
409    
# Line 365  if ( $port ) { Line 414  if ( $port ) {
414                  LocalPort => $port,                  LocalPort => $port,
415                  Proto     => 'tcp',                  Proto     => 'tcp',
416                  Reuse     => 1,                  Reuse     => 1,
417          ) or die $!;          ) or die "[$port] die $!";
418    
419          while (1) {          while (1) {
420    
# Line 440  sub info { Line 489  sub info {
489    
490  info;  info;
491  while ( keys %$connected != @nodes ) {  while ( keys %$connected != @nodes ) {
492          warn "[$port] wait for ", join(' ', grep { ! defined $connected->{$_} } @nodes );          warn "[$port] wait for [", join('] [', grep { ! defined $connected->{$_} } @nodes ), "]\n";
493          sleep 1;          sleep 1;
494            info;
495  }  }
496  run_views;  run_views;
497    
# Line 456  Sacks Lorry v$VERSION - path: $path offs Line 506  Sacks Lorry v$VERSION - path: $path offs
506    
507          View Run        run views          View Run        run views
508          VI \\e Output   show output of last run          VI \\e Output   show output of last run
509          Info [\$VERSION]        instrospect          Info [Report]   info with optional report
510          Quit EXit       shutdown          Quit EXit       shutdown
511    
512  __HELP__  __HELP__

Legend:
Removed from v.76  
changed lines
  Added in v.87

  ViewVC Help
Powered by ViewVC 1.1.26