/[Sack]/trunk/bin/sack.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/bin/sack.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 35 by dpavlin, Wed Sep 23 21:55:08 2009 UTC revision 84 by dpavlin, Tue Sep 29 16:54:05 2009 UTC
# Line 3  Line 3 
3  use warnings;  use warnings;
4  use strict;  use strict;
5    
6  our $VERSION = '0.02';  our $VERSION = '0.06';
7    
8  use Time::HiRes qw(time);  use Time::HiRes qw(time);
9  use Data::Dump qw(dump);  use Data::Dump qw(dump);
# Line 27  GetOptions( Line 27  GetOptions(
27          'offset=i' => \$offset,          'offset=i' => \$offset,
28          'limit=i'  => \$limit,          'limit=i'  => \$limit,
29          'view=s'   => \@views,          'view=s'   => \@views,
30          'listen|port=i' => \$port,          'port|listen=i' => \$port,
31          'connect=s'   => \@nodes,          'node|connect=i'   => \@nodes,
32          'debug!'   => \$debug,          'debug!'   => \$debug,
33  ) or die $!;  ) or die $!;
34    
# Line 44  sub BEGIN { Line 44  sub BEGIN {
44                  chomp( my $pwd = `pwd` );                  chomp( my $pwd = `pwd` );
45                  $prefix = "$pwd/$prefix";                  $prefix = "$pwd/$prefix";
46          }          }
47          $prefix =~ s{^(.*)/srv/Sack/[\./]+bin.+$}{$1};          $prefix =~ s{^(.*)/srv/Sack/.+$}{$1};
48          warn "# prefix $prefix";          warn "# prefix $prefix";
49    
50          $SIG{INT} = sub {          $SIG{INT} = sub {
# Line 53  sub BEGIN { Line 53  sub BEGIN {
53                  #clean if $clean;       # FIXME                  #clean if $clean;       # FIXME
54                  die "SIG$signame";                  die "SIG$signame";
55          };          };
56    
57            sub port2color {
58                    my $port = shift;
59                    return "\e[1m0\e[0m" if $port == 0;
60    
61                    my $c = ( $port % 6 ) + 31;
62                    return "\e[${c}m$port\e[0m";
63            }
64    
65            $SIG{__WARN__} = sub {
66                    return unless @_;
67                    my $msg = join('', @_);
68                    if ( $msg !~ m{[\n\r]$} ) {
69                            my @loc = caller(1);
70                            $msg .= " in $loc[1] +$loc[2]\n" if @loc;
71                    }
72                    $msg =~ s{\[(0|\d\d\d\d)\]}{ '[' . port2color($1) . ']' }eg;
73                    print STDERR $msg;
74                    return 1;
75            };
76    
77  }  }
78    
79  use lib "$prefix/srv/Sack/lib/";  use lib "$prefix/srv/Sack/lib/";
80  use Sack::Digest;  use Sack::Digest;
81  Sack::Digest->clean;  our $digest = Sack::Digest->new( port => $port, clean => 1 );
82  Sack::Digest->open( $port );  sub digest { $digest->to_int($_[0]) }
 sub digest { Sack::Digest->to_int($_[0]) }  
83    
84  use lib "$prefix/srv/webpac2/lib/";  use lib "$prefix/srv/webpac2/lib/";
85  use WebPAC::Input::ISI;  use WebPAC::Input::ISI;
# Line 73  my $input = WebPAC::Input::ISI->new( Line 93  my $input = WebPAC::Input::ISI->new(
93  );  );
94    
95  our $num_records = $input->size;  our $num_records = $input->size;
96    our @reports;
97    
98  sub report {  sub report {
99          my $description = join(' ',@_);          my $description = join(' ',@_);
100          my $dt = time - $t;          my $dt = time - $t;
101          printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;          my $report = [ $description, $dt, $input->size / $dt ];
102            warn sprintf "[$port] %s in %1.4fs %.2f/s\n", @$report;
103            push @reports, $report;
104          $t = time;          $t = time;
105  }  }
106    
107    sub show_report {
108            "\n" . join( "\n", map { sprintf "%8.4fs %10.2f/s  %s", $_->[1], $_->[2], $_->[0] } @reports ) . "\n";
109    }
110    
111  report $input->size . ' records loaded';  report $input->size , 'records loaded';
112    
113  mkdir 'out' unless -e 'out';  mkdir 'out' unless -e 'out';
114    
# Line 92  our $cache; Line 118  our $cache;
118    
119  our $connected;  our $connected;
120    
121    sub node_sock {
122            my $node = shift;
123            my $sock = IO::Socket::INET->new(
124                            PeerAddr => '127.0.0.1',
125                            PeerPort => $node,
126                            Proto    => 'tcp',
127            );
128            
129            return $sock if $sock && $sock->connected;
130    
131            warn "[$port] can't connect to $node - $!\n"; # FIXME die?
132            return;
133    }
134    
135  sub send_nodes {  sub send_nodes {
136          my $content = $#_ > 0 ? pop @_ : '';    # no content with just one argument!          my $content = $#_ > 0 ? pop @_ : '';    # no content with just one argument!
137          my $header = defined $content ? length($content) : 0;          my $header = defined $content ? length($content) : 0;
138          $header .= ' ' . join(' ', @_) if @_;          $header .= ' ' . join(' ', @_) if @_;
139    
140          foreach my $node ( @nodes ) {          warn "# send_nodes ", dump(@_), " to ", dump @nodes;
141    
142                  my $sock = IO::Socket::INET->new(          foreach my $node ( @nodes ) {
                         PeerAddr => $node,  
                         Proto    => 'tcp',  
                 );  
143    
144                  if ( ! $sock ) {                  my $sock = node_sock($node) || next;
                         warn "can't connect to $node - $!"; # FIXME die?  
                         next;  
                 }  
145    
146                  warn "[$port] >>>> $node $header\n";                  warn "[$port] >>>> [$node] $header\n";
147                  print $sock "$header\n$content" || warn "can't send $header to $node: $!";                  print $sock "$header\n$content" || warn "can't send $header to $node: $!";
148    
149                  $connected->{$node} = $sock;                  $connected->{$node} = $sock;
# Line 120  sub get_node { Line 154  sub get_node {
154          my $node = shift;          my $node = shift;
155    
156          my $sock = $connected->{$node};          my $sock = $connected->{$node};
157          if ( ! $sock ) {          if ( ! $sock || ! $sock->connected ) {
158                  warn "[$port] ERROR lost connection to $node";                  warn "[$port] no connection to $node";
159                  delete $connected->{$node};                  delete $connected->{$node};
160                  return;                  return;
161          }          }
162          chomp( my $size = <$sock> );          chomp( my $size = <$sock> );
163          warn "[$port] <<<< $node $size bytes\n";          warn "[$port] <<<< [$node] $size bytes\n" if $debug || $size > 1024;
164          my $data;          my $data;
165          read $sock, $data, $size;          read $sock, $data, $size;
166          return $data;          return $data;
# Line 135  sub get_node { Line 169  sub get_node {
169  sub send_sock {  sub send_sock {
170          my ( $sock, $data ) = @_;          my ( $sock, $data ) = @_;
171          my $size   = length $data;          my $size   = length $data;
172          warn "[$port] >>>> ", $sock->peerhost, " $size bytes\n";          warn "[$port] >>>> $size bytes\n" if $debug || $size > 1024;
173          print $sock "$size\n$data" || warn "can't send $size bytes to ", $sock->peerhost;          print $sock "$size\n$data" || warn "can't send $size bytes to ", $sock->peerhost;
174  }  }
175    
176    sub pull_node_file {
177            my ( $node, $file ) = @_;
178    
179            my $path = "/dev/shm/sack.$node.$file";
180            return if -e $path; # FIXME
181    
182            my $sock = node_sock $node || die "not connected to $node";
183    
184            print $sock "0 file $file\n";
185    
186            my $size = <$sock>;
187            chomp($size);
188            warn "[$port] pull_node_file [$node] $file $size bytes\n";
189    
190            my $block = 4096;
191            my $buff  = ' ' x $block;
192    
193            open(my $fh, '>', $path) || die "can't open $path";
194            while ( read $sock, $buff, $block ) {
195                    print $fh $buff;
196            }
197            close($fh);
198    }
199    
200    my $merge_digest_mapping;
201    
202  sub merge_out {  sub merge_out {
203          my $new = shift;          my ( $from_node, $new ) = @_;
204    
205            my $t_merge = time();
206    
207            pull_node_file $from_node => 'nr_md5';
208            pull_node_file $from_node => 'md5';
209    
210            my $remote_digest = Sack::Digest->new( port => $from_node );
211            my ( $local, $remote ) = ( 0, 0 );
212    
213            my $tick = 0;
214            warn "[$port] merge [$from_node]";
215    
216            my $missing;
217    
218          foreach my $k1 ( keys %$new ) {          foreach my $k1 ( keys %$new ) {
219    
220                  foreach my $k2 ( keys %{ $new->{$k1} } ) {                  foreach my $k2 ( keys %{ $new->{$k1} } ) {
221    
222                          my $n   = delete $new->{$k1}->{$k2};                          my $n   = delete $new->{$k1}->{$k2};
                         my $ref = ref    $out->{$k1}->{$k2};  
223    
224                            if ( $k1 =~ m{#} ) {
225                                    die "ASSERT $k1 $k2" unless $k2 =~ m{^\d+$};
226    #warn "XXX $k1 $k2";
227    
228                                    if ( defined $merge_digest_mapping->{$from_node}->[ $k2 ] ) {
229                                            $k2 = $merge_digest_mapping->{$from_node}->[ $k2 ];
230                                    } else {
231    
232                                            my $md5 = $remote_digest->{nr_md5}->[$k2];
233    
234                                            if ( ! $md5 ) {
235                                                    $missing->{nr_md5}->{$from_node}++; # FIXME die?
236                                                    next;
237                                            }
238    
239                                            my $local_k2;
240    
241                                            if ( $local_k2 = $digest->{md5_nr}->{$md5} ) {
242                                                    $local++;
243                                            } elsif ( my $full = $remote_digest->{md5}->{$md5} ) {
244                                                    $local_k2 = $digest->to_int( $remote_digest->{md5}->{$md5} );
245                                                    $remote++;
246                                            } else {
247                                                    $missing->{md5}->{$from_node}++;
248                                            }
249    
250                                            $k2 = $merge_digest_mapping->{$from_node}->[ $k2 ] = $local_k2;
251    
252                                    }
253                            }
254    
255                            my $ref = ref    $out->{$k1}->{$k2};
256    #warn "XXXX $k1 $k2 $ref";
257                          if ( ! defined $out->{$k1}->{$k2} ) {                          if ( ! defined $out->{$k1}->{$k2} ) {
258                                  $out->{$k1}->{$k2} = $n;                                  $out->{$k1}->{$k2} = $n;
259                          } elsif ( $k1 =~ m{\+} ) {                          } elsif ( $k1 =~ m{\+} ) {
# Line 165  sub merge_out { Line 270  sub merge_out {
270                          } else {                          } else {
271                                  die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});                                  die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
272                          }                          }
273    
274                            if ( $tick++ % 1000 == 0 ) {
275                                    print STDERR ".";
276                            } elsif ( $tick % 10000 == 0 ) {
277                                    print STDERR $tick;
278                            }
279                  }                  }
280          }          }
281    
282            $t_merge = time - $t_merge;
283            my $digests = $local + $remote;
284            warn sprintf "\n[$port] merge %d in %.4fs %.2f/s digests local: %.1f%% %d/%d\n", $tick, $t_merge, $digests / $t_merge, $local * 100 / ( $digests || 1 ), $local, $remote;
285            push @reports, [ "$tick merged [$from_node]", $t_merge, $digests / $t_merge ];
286    
287            warn "[$port] missing ", dump $missing if $missing;
288    
289          warn "## merge out ", dump $out if $debug;          warn "## merge out ", dump $out if $debug;
290  }  }
291    
292    our $rec;
293    
294  sub run_code {  sub run_code {
295          my ( $view, $code ) = @_;          my ( $view, $code ) = @_;
296    
# Line 183  sub run_code { Line 303  sub run_code {
303          my $affected = 0;          my $affected = 0;
304          $t = time;          $t = time;
305    
306            my $coderef = eval "sub { $code }";
307            if ( $@ ) {
308                    warn "ABORT code: $@";
309                    return;
310            }
311    
312          foreach my $pos ( $offset + 1 .. $offset + $input->size ) {          foreach my $pos ( $offset + 1 .. $offset + $input->size ) {
313                  my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );                  $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );
314                  if ( ! $rec ) {                  if ( ! $rec ) {
315                          warn "END at $pos";                          print STDERR "END @ $pos";
316                          last;                          last;
317                  }                  }
318    
319                  eval "$code";                  eval { $coderef->() };
320                  if ( $@ ) {                  if ( $@ ) {
321                          warn "ABORT [$pos] $@\n";                          warn "ABORT $pos $@\n";
322                          last;                          last;
323                  } else {                  } else {
324                          $affected++;                          $affected++;
325                  }                  }
326    
327                  $pos % 10000 == 0 ? print STDERR $pos - $offset :                  $pos % 10000 == 0 ? print STDERR $pos :
328                  $pos % 1000  == 0 ? print STDERR "." : 0 ;                  $pos % 1000  == 0 ? print STDERR "."  : 0 ;
329          };          };
330    
331          report "\n[$port] RECS $affected $view";          report "$affected affected $view";
332    
333          warn "WARN no \$out defined!" unless defined $out;          warn "WARN no \$out defined!" unless defined $out;
334    
335            $digest->sync;
336    
337          if ( $connected ) {          if ( $connected ) {
338                  foreach my $node ( keys %$connected ) {                  foreach my $node ( keys %$connected ) {
339                          warn "[$port] get_node $node\n";                          warn "[$port] get_node [$node]\n";
340                          my $o = get_node $node;                          my $o = get_node $node;
341                            next unless $o;
342                          my $s = length $o;                          my $s = length $o;
343                          $o = thaw $o;                          $o = thaw $o;
344                          warn "[$port] merge $node $s bytes\n";                          warn "[$port] got $s bytes from [$node]\n";
345                          merge_out $o;                          merge_out $node => $o;
346                  }                  }
347          }          }
348  }  }
# Line 240  sub run_views { Line 369  sub run_views {
369                          rename $path, "$path.last";                          rename $path, "$path.last";
370    
371                          store $out => $path;                          store $out => $path;
372                          report "[$port] SAVE $path $offset-$limit", -s $path, "bytes";                          report "save $path", -s $path, "bytes";
373    
374                          if ( -s $path < 4096 ) {                          if ( -s $path < 4096 ) {
375                                  print '$out = ',dump( $out ),$/;                                  print '$out = ', dump $digest->undigest_out($out);
376                          }                          }
377                  }                  }
378    
# Line 253  sub run_views { Line 382  sub run_views {
382    
383    
384  sub info_tabs {  sub info_tabs {
385          "$port\t$offset\t$limit\t$num_records\t$path\t"          my $port_col = port2color($port);
386          . join(',', map {          "$port_col\t$offset\t$limit\t$num_records\t$path\t"
387            . join("\t", map {
388                  my $b = $_;                  my $b = $_;
389                  $b =~ s{^.+/([^/]+)$}{$1};                  $b =~ s{^.+\.$port\.([^/]+)$}{$1};
390                  "$b " . -s $_                  "$b " . -s $_
391          } glob '/dev/shm/sack.*' );          } glob "/dev/shm/sack.$port.*" );
392  }  }
393    
394    
395  if ( $port ) {  if ( $port ) {
396    
397            my $pid_path = "/tmp/sack.$port.pid";
398            if ( -e $pid_path ) {
399                    my $pid = read_file $pid_path;
400                    kill 9, $pid && warn "[$port] kill old $pid\n";
401            }
402            write_file $pid_path, $$;
403    
404    
405          my $sock = IO::Socket::INET->new(          my $sock = IO::Socket::INET->new(
406                  Listen    => SOMAXCONN,                  Listen    => SOMAXCONN,
407                  LocalAddr => '127.0.0.1',                  LocalAddr => '127.0.0.1',
408                  LocalPort => $port,                  LocalPort => $port,
409                  Proto     => 'tcp',                  Proto     => 'tcp',
410                  Reuse     => 1,                  Reuse     => 1,
411          ) or die $!;          ) or die "[$port] die $!";
412    
413          while (1) {          while (1) {
414    
415                  warn "[$port] READY path: $path offset: $offset limit: $limit #recs: $num_records\n";                  warn "[$port] accept path: $path offset: $offset limit: $limit #recs: $num_records\n";
416    
417                  my $client = $sock->accept();                  my $client = $sock->accept();
418    
# Line 293  if ( $port ) { Line 432  if ( $port ) {
432                  } elsif ( $header[0] eq 'info' ) {                  } elsif ( $header[0] eq 'info' ) {
433                          my $info = info_tabs;                          my $info = info_tabs;
434                          warn "[$port] info $info\n";                          warn "[$port] info $info\n";
435                            $info .= "\n" . show_report if $content =~ m{r}i;
436                          send_sock $client => $info;                          send_sock $client => $info;
437                  } elsif ( $header[0] eq 'exit' ) {                  } elsif ( $header[0] eq 'exit' ) {
438                          warn "[$port] exit";                          warn "[$port] exit";
439                          exit;                          exit;
440                    } elsif ( $header[0] eq 'file' ) {
441                            $digest->close;
442                            my $path = "/dev/shm/sack.$port.$header[1]";
443                            my $size = -s $path;
444                            warn "[$port] >>>> file $path $size bytes\n";
445                            print $client "$size\n";
446                            my $block = 4096;
447                            my $buff  = ' ' x $block;
448                            open(my $fh, '<', $path) || die "can't open $path";
449                            while ( read $fh, $buff, $block ) {
450                                    print $client $buff;
451                            }
452                            $digest->open;
453                  } else {                  } else {
454                          warn "[$port] UNKNOWN $header[0]";                          warn "[$port] UNKNOWN $header[0]";
455                  }                  }
456    
457                    $client->close;
458    
459          }          }
460  }  }
461    
462  sub info {  sub info {
463          send_nodes 'info' => $2;          my $detail = shift || '';
464    
465            send_nodes 'info' => $detail;
466    
467          my @info = (          my @info = (
468                  "port\toffset\tlimit\t#recs\tpath\tfiles",                  "port\toffset\tlimit\t#recs\tpath",
469                  "----\t------\t-----\t-----\t----\t-----",                  "----\t------\t-----\t-----\t----",
470                  info_tabs,                  info_tabs,
471          );          );
472    
473            push @info, show_report if $detail =~ m{r}i;
474    
475          push @info, get_node $_ foreach @nodes;          push @info, get_node $_ foreach @nodes;
476    
477          print "[$port] INFO\n"          print "[$port] INFO", $detail ? " $detail" : '', " \n"
478                  , join("\n", @info)                  , join("\n", @info)
479                  , "\n\n" ;                  , "\n" ;
480    
481          return @info;          return @info;
482  }  }
483    
484  info;  info;
485    while ( keys %$connected != @nodes ) {
486            warn "[$port] wait for [", join('] [', grep { ! defined $connected->{$_} } @nodes ), "]\n";
487            sleep 1;
488            info;
489    }
490  run_views;  run_views;
491    
492  while ( 1 ) {  while ( 1 ) {
# Line 336  Sacks Lorry v$VERSION - path: $path offs Line 500  Sacks Lorry v$VERSION - path: $path offs
500    
501          View Run        run views          View Run        run views
502          VI \\e Output   show output of last run          VI \\e Output   show output of last run
503          Info [\$VERSION]        instrospect          Info [Report]   info with optional report
504          Quit EXit       shutdown          Quit EXit       shutdown
505    
506  __HELP__  __HELP__
507          } elsif ( $cmd =~ m{^(vi|\\e|o)}i ) {          } elsif ( $cmd =~ m{^(vi|\\e|o)}i ) {
508                  #system "vi out/*";                  #system "vi out/*";
509                    $digest->sync;
510                  system "bin/storableedit.pl", (glob('out/*.storable'))[0];                  system "bin/storableedit.pl", (glob('out/*.storable'))[0];
511          } elsif ( $cmd =~ m{^i(?:nfo)?\s?(.+)?$}i ) {          } elsif ( $cmd =~ m{^i(?:nfo)?\s?(\S+)?$}i ) {
512                  info;                  info $1;
513          } elsif ( $cmd =~ m{^(q|e|x)}i ) {          } elsif ( $cmd =~ m{^(q|e|x)}i ) {
514                  warn "# exit";                  warn "# exit";
515                  send_nodes 'exit';                  send_nodes 'exit';
516                  exit;                  exit;
517          } elsif ( $cmd =~ m{^(v|r)}i ) {          } elsif ( $cmd =~ m{^(v|r)}i ) {
518                  run_views;                  run_views;
519            } elsif ( $cmd =~ m{^n(ode)?\s*(\d+)}i ) {
520                    push @nodes, $2;
521                    info;
522          } elsif ( $cmd ) {          } elsif ( $cmd ) {
523                  warn "UNKNOWN ", dump $cmd;                  warn "UNKNOWN ", dump $cmd;
524          }          }

Legend:
Removed from v.35  
changed lines
  Added in v.84

  ViewVC Help
Powered by ViewVC 1.1.26