/[Sack]/trunk/bin/sack.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/bin/sack.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 13 by dpavlin, Tue Sep 22 10:32:59 2009 UTC revision 19 by dpavlin, Tue Sep 22 15:14:00 2009 UTC
# Line 11  use IO::Socket::INET; Line 11  use IO::Socket::INET;
11  use Storable qw/freeze thaw/;  use Storable qw/freeze thaw/;
12    
13    
14    my $debug  = 0;
15  my $path   = '/data/isi/full.txt';  my $path   = '/data/isi/full.txt';
16  my $limit  = 5000;  my $limit  = 5000;
17  my $offset = 0;  my $offset = 0;
# Line 26  GetOptions( Line 27  GetOptions(
27          'view=s'   => \@views,          'view=s'   => \@views,
28          'listen|port=i' => \$listen,          'listen|port=i' => \$listen,
29          'connect=s'   => \@nodes,          'connect=s'   => \@nodes,
30            'debug!'   => \$debug,
31  ) or die $!;  ) or die $!;
32    
33  my $t = time;  my $t = time;
# Line 38  BEGIN { Line 40  BEGIN {
40                  chomp( my $pwd = `pwd` );                  chomp( my $pwd = `pwd` );
41                  $prefix = "$pwd/$prefix";                  $prefix = "$pwd/$prefix";
42          }          }
43          $prefix =~ s{^(.+)/srv/Sack/bin.+$}{$1};          $prefix =~ s{^(.*)/srv/Sack/bin.+$}{$1};
44          warn "# prefix $prefix";          warn "# prefix $prefix";
45  }  }
46    
# Line 71  our $cache; Line 73  our $cache;
73  our $connected;  our $connected;
74    
75  sub send_nodes {  sub send_nodes {
76          my $content = pop @_;          my $content = $#_ > 0 ? pop @_ : '';    # no content with just one argument!
77          my $header = length($content);          my $header = length($content);
78          $header .= ' ' . join(' ', @_) if @_;          $header .= ' ' . join(' ', @_) if @_;
79    
# Line 80  sub send_nodes { Line 82  sub send_nodes {
82                  my $sock = IO::Socket::INET->new(                  my $sock = IO::Socket::INET->new(
83                          PeerAddr => $node,                          PeerAddr => $node,
84                          Proto    => 'tcp',                          Proto    => 'tcp',
85                  ) or die "can't connect to $node - $!";                  );
86    
87                  print ">>>> $node $header\n";                  if ( ! $sock ) {
88                            warn "can't connect to $node - $!"; # FIXME die?
89                            next;
90                    }
91    
92                    print ">>>> $node $header\n";
93                  print $sock "$header\n$content" || warn "can't send $header to $node: $!";                  print $sock "$header\n$content" || warn "can't send $header to $node: $!";
94    
95                  $connected->{$node} = $sock;                  $connected->{$node} = $sock;
96          }          }
97  }  }
98    
99    sub get_node {
100            my $node = shift;
101    
102            my $sock = $connected->{$node};
103            if ( ! $sock ) {
104                    warn "ERROR: lost connection to $node";
105                    delete $connected->{$node};
106                    return;
107            }
108            chomp( my $size = <$sock> );
109            warn "<<<< $node $size bytes\n";
110            my $data;
111            read $sock, $data, $size;
112            return $data;
113    }
114    
115    sub send_sock {
116            my ( $sock, $data ) = @_;
117            my $size   = length $data;
118            warn ">>>> ", $sock->peerhost, " $size bytes";
119            print $sock "$size\n$data" || warn "can't send $size bytes to ", $sock->peerhost;
120    }
121    
122  sub merge_out {  sub merge_out {
123          my $new = shift;          my $new = shift;
124    
         warn "## merge $new\n";  
   
125          foreach my $k1 ( keys %$new ) {          foreach my $k1 ( keys %$new ) {
126    
127                  foreach my $k2 ( keys %{ $new->{$k1} } ) {                  foreach my $k2 ( keys %{ $new->{$k1} } ) {
# Line 105  sub merge_out { Line 132  sub merge_out {
132                          if ( ! defined $out->{$k1}->{$k2} ) {                          if ( ! defined $out->{$k1}->{$k2} ) {
133                                  $out->{$k1}->{$k2} = $n;                                  $out->{$k1}->{$k2} = $n;
134                          } elsif ( $k1 =~ m{\+} ) {                          } elsif ( $k1 =~ m{\+} ) {
135                                  warn "# agregate $k1 $k2";  #                               warn "## agregate $k1 $k2";
136                                  $out->{$k1}->{$k2} += $n;                                  $out->{$k1}->{$k2} += $n;
137                          } elsif ( $ref eq 'ARRAY' ) {                          } elsif ( $ref eq 'ARRAY' ) {
138                                  push @{ $out->{$k1}->{$k2} }, $n;                                  push @{ $out->{$k1}->{$k2} }, $n;
# Line 117  sub merge_out { Line 144  sub merge_out {
144                  }                  }
145          }          }
146    
147          warn "## merge out ", dump $out;          warn "## merge out ", dump $out if $debug;
148  }  }
149    
150  sub run_code {  sub run_code {
# Line 153  sub run_code { Line 180  sub run_code {
180    
181          if ( $connected ) {          if ( $connected ) {
182                  warn "# get results from ", join(' ', keys %$connected );                  warn "# get results from ", join(' ', keys %$connected );
183                    merge_out( thaw( get_node( $_ ) ) ) foreach keys %$connected;
                 foreach my $node ( keys %$connected ) {  
                         my $sock = $connected->{$node};  
                         my $size = <$sock>;  
                         warn "<<<< $node $size bytes\n";  
                         my $part;  
                         read $sock, $part, $size;  
                         merge_out( thaw $part );  
                 }  
184          }          }
185  }  }
186    
# Line 215  if ( $listen ) { Line 234  if ( $listen ) {
234    
235                  my $client = $sock->accept();                  my $client = $sock->accept();
236    
237                  warn "<<<< connect from ", $client->peerhost, $/;                  warn "<<<< $listen connect from ", $client->peerhost, $/;
238    
239                  my @header = split(/\s/, <$client>);                  my @header = split(/\s/, <$client>);
240                  warn "# header ",dump @header;                  warn "# header ",dump @header;
# Line 227  if ( $listen ) { Line 246  if ( $listen ) {
246    
247                  if ( $header[0] eq 'view' ) {                  if ( $header[0] eq 'view' ) {
248                          run_code $header[1] => $content;                          run_code $header[1] => $content;
249                            send_sock $client => freeze $out;
250                          my $frozen = freeze $out;                  } elsif ( $header[0] eq 'info' ) {
251                          my $size   = length $frozen;                          my $info = "$listen\t$offset\t$limit\t$path";
252                          warn ">>>> $size bytes";                          warn "info $info\n";
253                          print $client "$size\n$frozen";                          send_sock $client => $info;
254                    } elsif ( $header[0] eq 'exit' ) {
255                            warn "exit $listen";
256                            exit;
257                  } else {                  } else {
258                          warn "WARN unknown";                          warn "WARN $listen unknown";
259                  }                  }
260    
261          }          }
# Line 250  while ( 1 ) { Line 271  while ( 1 ) {
271          if ( $cmd =~ m{^(vi?|\\e|o(?:ut)?)}i ) {          if ( $cmd =~ m{^(vi?|\\e|o(?:ut)?)}i ) {
272                  system "vi out/*";                  system "vi out/*";
273          } elsif ( $cmd =~ m{^i(nfo)?}i ) {          } elsif ( $cmd =~ m{^i(nfo)?}i ) {
274                  print "nodes: ", dump @nodes, $/;                  print "# nodes: ", join(' ',@nodes), $/;
275    
276                    my @info = (
277                            "node\toffset\tlimit\tpath",
278                            "----\t------\t-----\t----",
279                            "here\t$offset\t$limit\t$path",
280                    );
281    
282                    send_nodes 'info';
283                    push @info, get_node $_ foreach @nodes;
284    
285                    print "$_\n" foreach @info;
286    
287            } elsif ( $cmd =~ m{^(q(uit)|e(xit))}i ) {
288                    warn "# exit";
289                    send_nodes 'exit';
290                    exit;
291          } else {          } else {
292                  run_views;                  run_views;
293          }          }

Legend:
Removed from v.13  
changed lines
  Added in v.19

  ViewVC Help
Powered by ViewVC 1.1.26