/[Sack]/trunk/bin/sack.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/bin/sack.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 33 by dpavlin, Wed Sep 23 20:28:21 2009 UTC revision 50 by dpavlin, Thu Sep 24 19:21:55 2009 UTC
# Line 3  Line 3 
3  use warnings;  use warnings;
4  use strict;  use strict;
5    
6  our $VERSION = '0.02';  our $VERSION = '0.04';
7    
8  use Time::HiRes qw(time);  use Time::HiRes qw(time);
9  use Data::Dump qw(dump);  use Data::Dump qw(dump);
# Line 57  sub BEGIN { Line 57  sub BEGIN {
57    
58  use lib "$prefix/srv/Sack/lib/";  use lib "$prefix/srv/Sack/lib/";
59  use Sack::Digest;  use Sack::Digest;
60  Sack::Digest->open( $port );  our $digest = Sack::Digest->new( port => $port, clean => 1 );
61  sub digest { Sack::Digest->digest($_[0]) }  sub digest { $digest->to_int($_[0]) }
62    
63  use lib "$prefix/srv/webpac2/lib/";  use lib "$prefix/srv/webpac2/lib/";
64  use WebPAC::Input::ISI;  use WebPAC::Input::ISI;
# Line 139  sub send_sock { Line 139  sub send_sock {
139  }  }
140    
141  sub merge_out {  sub merge_out {
142          my $new = shift;          my ( $from_node, $new ) = @_;
143    
144            warn "### merge $from_node";
145    
146            my $from_port = $from_node;
147            $from_port =~ s{.+:(\d+)$}{$1};
148    
149            my $remote_digest = Sack::Digest->new( port => $from_port );
150            my ( $local, $remote ) = ( 0, 0 );
151    
152          foreach my $k1 ( keys %$new ) {          foreach my $k1 ( keys %$new ) {
153    
154                  foreach my $k2 ( keys %{ $new->{$k1} } ) {                  foreach my $k2 ( keys %{ $new->{$k1} } ) {
155    
156                          my $n   = delete $new->{$k1}->{$k2};                          my $n   = delete $new->{$k1}->{$k2};
                         my $ref = ref    $out->{$k1}->{$k2};  
157    
158                            if ( $k1 =~ m{#} ) {
159                                    die "ASSERT $k1 $k2" unless $k2 =~ m{^\d+$};
160    #warn "XXX $k1 $k2";
161                                    my $md5 = $remote_digest->{nr_md5}->[$k2] || warn "[$port] no2md5 $n not found in $from_port\n";
162                                    if ( my $local_k2 = $digest->{md5_nr}->{$md5} ) {
163                                            $k2 = $local_k2;
164                                            $local++;
165                                    } else {
166                                            $k2 = $digest->to_int( $remote_digest->{md5}->{$md5} );
167                                            $remote++;
168                                    }
169                            }
170    
171                            my $ref = ref    $out->{$k1}->{$k2};
172    #warn "XXXX $k1 $k2 $ref";
173                          if ( ! defined $out->{$k1}->{$k2} ) {                          if ( ! defined $out->{$k1}->{$k2} ) {
174                                  $out->{$k1}->{$k2} = $n;                                  $out->{$k1}->{$k2} = $n;
175                          } elsif ( $k1 =~ m{\+} ) {                          } elsif ( $k1 =~ m{\+} ) {
# Line 167  sub merge_out { Line 189  sub merge_out {
189                  }                  }
190          }          }
191    
192            warn "[$port] merge local $local remote $remote from $from_port\n";
193          warn "## merge out ", dump $out if $debug;          warn "## merge out ", dump $out if $debug;
194  }  }
195    
# Line 185  sub run_code { Line 208  sub run_code {
208          foreach my $pos ( $offset + 1 .. $offset + $input->size ) {          foreach my $pos ( $offset + 1 .. $offset + $input->size ) {
209                  my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );                  my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );
210                  if ( ! $rec ) {                  if ( ! $rec ) {
211                          warn "END at $pos";                          print STDERR "END @ $pos";
212                          last;                          last;
213                  }                  }
214    
215                  eval "$code";                  eval "$code";
216                  if ( $@ ) {                  if ( $@ ) {
217                          warn "ABORT [$pos] $@\n";                          warn "ABORT $pos $@\n";
218                          last;                          last;
219                  } else {                  } else {
220                          $affected++;                          $affected++;
221                  }                  }
222    
223                  $pos % 10000 == 0 ? print STDERR $pos - $offset :                  $pos % 10000 == 0 ? print STDERR $pos :
224                  $pos % 1000  == 0 ? print STDERR "." : 0 ;                  $pos % 1000  == 0 ? print STDERR "."  : 0 ;
225          };          };
226    
227          report "\n[$port] RECS $affected $view";          report "\n[$port] RECS $affected $view";
228    
229          warn "WARN no \$out defined!" unless defined $out;          warn "WARN no \$out defined!" unless defined $out;
230    
231            $digest->sync;
232    
233          if ( $connected ) {          if ( $connected ) {
234                  foreach my $node ( keys %$connected ) {                  foreach my $node ( keys %$connected ) {
235                          warn "[$port] get_node $node\n";                          warn "[$port] get_node $node\n";
# Line 212  sub run_code { Line 237  sub run_code {
237                          my $s = length $o;                          my $s = length $o;
238                          $o = thaw $o;                          $o = thaw $o;
239                          warn "[$port] merge $node $s bytes\n";                          warn "[$port] merge $node $s bytes\n";
240                          merge_out $o;                          merge_out $node => $o;
241                  }                  }
242          }          }
243  }  }
# Line 240  sub run_views { Line 265  sub run_views {
265    
266                          store $out => $path;                          store $out => $path;
267                          report "[$port] SAVE $path $offset-$limit", -s $path, "bytes";                          report "[$port] SAVE $path $offset-$limit", -s $path, "bytes";
268    
269                            if ( -s $path < 4096 ) {
270                                    print '$out = ', dump $digest->undigest_out($out);
271                            }
272                  }                  }
273    
274          }          }
275    
276  }  }
277    
278    
279    sub info_tabs {
280            "$port\t$offset\t$limit\t$num_records\t$path\t"
281            . join("\t", map {
282                    my $b = $_;
283                    $b =~ s{^.+\.$port\.([^/]+)$}{$1};
284                    "$b " . -s $_
285            } glob "/dev/shm/sack.$port.*" );
286    }
287    
288    
289  if ( $port ) {  if ( $port ) {
290          my $sock = IO::Socket::INET->new(          my $sock = IO::Socket::INET->new(
291                  Listen    => SOMAXCONN,                  Listen    => SOMAXCONN,
# Line 275  if ( $port ) { Line 315  if ( $port ) {
315                          run_code $header[1] => $content;                          run_code $header[1] => $content;
316                          send_sock $client => freeze $out;                          send_sock $client => freeze $out;
317                  } elsif ( $header[0] eq 'info' ) {                  } elsif ( $header[0] eq 'info' ) {
318                          my $info = "$port\t$offset\t$limit\t$num_records\t$path";                          my $info = info_tabs;
                         $info .= "\t" . eval $header[1] if $header[1];  
319                          warn "[$port] info $info\n";                          warn "[$port] info $info\n";
320                          send_sock $client => $info;                          send_sock $client => $info;
321                  } elsif ( $header[0] eq 'exit' ) {                  } elsif ( $header[0] eq 'exit' ) {
# Line 295  sub info { Line 334  sub info {
334          my @info = (          my @info = (
335                  "port\toffset\tlimit\t#recs\tpath",                  "port\toffset\tlimit\t#recs\tpath",
336                  "----\t------\t-----\t-----\t----",                  "----\t------\t-----\t-----\t----",
337                  "$port\t$offset\t$limit\t$num_records\t$path",                  info_tabs,
338          );          );
339    
340          push @info, get_node $_ foreach @nodes;          push @info, get_node $_ foreach @nodes;

Legend:
Removed from v.33  
changed lines
  Added in v.50

  ViewVC Help
Powered by ViewVC 1.1.26