/[Sack]/trunk/bin/sack.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/bin/sack.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 9 by dpavlin, Mon Sep 21 20:05:41 2009 UTC revision 24 by dpavlin, Tue Sep 22 21:27:13 2009 UTC
# Line 3  Line 3 
3  use warnings;  use warnings;
4  use strict;  use strict;
5    
6    our $VERSION = '0.01';
7    
8  use Time::HiRes qw(time);  use Time::HiRes qw(time);
9  use Data::Dump qw(dump);  use Data::Dump qw(dump);
10  use File::Slurp;  use File::Slurp;
11  use Getopt::Long;  use Getopt::Long;
12    use IO::Socket::INET;
13    use Storable qw/freeze thaw/;
14    
15    
16    my $debug  = 0;
17  my $path   = '/data/isi/full.txt';  my $path   = '/data/isi/full.txt';
18  my $limit  = 10000;  my $limit  = 5000;
19  my $offset = 0;  my $offset = 0;
20  my @views;  my @views;
21    my $listen = 0; # off
22    my @nodes;
23    
24    
25  GetOptions(  GetOptions(
# Line 20  GetOptions( Line 27  GetOptions(
27          'offset=i' => \$offset,          'offset=i' => \$offset,
28          'limit=i'  => \$limit,          'limit=i'  => \$limit,
29          'view=s'   => \@views,          'view=s'   => \@views,
30            'listen|port=i' => \$listen,
31            'connect=s'   => \@nodes,
32            'debug!'   => \$debug,
33  ) or die $!;  ) or die $!;
34    
35  my $t = time;  my $t = time;
36    
37  use lib '/srv/webpac2/lib/';  
38    sub send_nodes;
39    
40    our $prefix;
41    sub BEGIN {
42            $prefix = $0;
43            if ( $prefix =~ s{^./}{} ) {
44                    chomp( my $pwd = `pwd` );
45                    $prefix = "$pwd/$prefix";
46            }
47            $prefix =~ s{^(.*)/srv/Sack/bin.+$}{$1};
48            warn "# prefix $prefix";
49    
50            $SIG{INT} = sub {
51                    my $signame = shift;
52                    send_nodes 'exit';
53                    die "SIG$signame";
54            };
55    }
56    
57    
58    use lib "$prefix/srv/webpac2/lib/";
59  use WebPAC::Input::ISI;  use WebPAC::Input::ISI;
60    
61    $WebPAC::Input::ISI::subfields = undef; # disable parsing of subfields
62    
63  my $input = WebPAC::Input::ISI->new(  my $input = WebPAC::Input::ISI->new(
64          path   => $path,          path   => "$prefix/$path",
65          offset => $offset,          offset => $offset,
66          limit  => $limit,          limit  => $limit,
67  );  );
# Line 37  sub report { Line 71  sub report {
71          my $description = shift;          my $description = shift;
72          my $dt = time - $t;          my $dt = time - $t;
73          printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;          printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;
74            $t = time;
75  }  }
76    
77    
# Line 48  our $out; Line 83  our $out;
83    
84  our $cache;  our $cache;
85    
86  sub run_views {  our $connected;
         @views = sort glob 'views/*.pl' unless @views;  
         warn "# views ", dump @views;  
87    
88          foreach my $view ( @views ) {  sub send_nodes {
89            my $content = $#_ > 0 ? pop @_ : '';    # no content with just one argument!
90            my $header = defined $content ? length($content) : 0;
91            $header .= ' ' . join(' ', @_) if @_;
92    
93            foreach my $node ( @nodes ) {
94    
95                    my $sock = IO::Socket::INET->new(
96                            PeerAddr => $node,
97                            Proto    => 'tcp',
98                    );
99    
100                    if ( ! $sock ) {
101                            warn "can't connect to $node - $!"; # FIXME die?
102                            next;
103                    }
104    
105                  my ( $nr, $package ) = ( $1, $2 )                  print ">>>> $listen $node $header\n";
106                          if $view =~ m{/(\d+)\.([^/]+(\.pl)?$)};                  print $sock "$header\n$content" || warn "can't send $header to $node: $!";
107    
108                  undef $out;                  $connected->{$node} = $sock;
109            }
110    }
111    
112                  next if system("perl -c $view") != 0;  sub get_node {
113            my $node = shift;
114    
115                  my $code = read_file $view;          my $sock = $connected->{$node};
116                  warn "## CODE\n$code\n## CODE\n";          if ( ! $sock ) {
117                    warn "ERROR: lost connection to $node";
118                    delete $connected->{$node};
119                    return;
120            }
121            chomp( my $size = <$sock> );
122            warn "<<<< $listen $node $size bytes\n";
123            my $data;
124            read $sock, $data, $size;
125            return $data;
126    }
127    
128                  my $affected = 0;  sub send_sock {
129                  $t = time;          my ( $sock, $data ) = @_;
130            my $size   = length $data;
131            warn ">>>> $listen ", $sock->peerhost, " $size bytes";
132            print $sock "$size\n$data" || warn "can't send $size bytes to ", $sock->peerhost;
133    }
134    
135                  foreach my $pos ( $offset + 1 .. $offset + $input->size ) {  sub merge_out {
136                          my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );          my $new = shift;
137                          if ( ! $rec ) {  
138                                  warn "END at $pos";          foreach my $k1 ( keys %$new ) {
139                                  last;  
140                          }                  foreach my $k2 ( keys %{ $new->{$k1} } ) {
141    
142                          eval "$code";                          my $n   =     $new->{$k1}->{$k2};
143                          if ( $@ ) {                          my $ref = ref $out->{$k1}->{$k2};
144                                  warn "ERROR [$pos] $@\n";  
145                            if ( ! defined $out->{$k1}->{$k2} ) {
146                                    $out->{$k1}->{$k2} = $n;
147                            } elsif ( $k1 =~ m{\+} ) {
148    #                               warn "## agregate $k1 $k2";
149                                    $out->{$k1}->{$k2} += $n;
150                            } elsif ( $ref  eq 'ARRAY' ) {
151                                    if ( ref $n eq 'ARRAY' ) {
152                                            push @{ $out->{$k1}->{$k2} }, $_ foreach @$n;
153                                    } else {
154                                            push @{ $out->{$k1}->{$k2} }, $n;
155                                    }
156                            } elsif ( $ref eq '' ) {
157                                    $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
158                          } else {                          } else {
159                                  $affected++;                                  die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
160                          }                          }
161                  };                  }
162            }
163    
164            warn "## merge out ", dump $out if $debug;
165    }
166    
167    sub run_code {
168            my ( $view, $code ) = @_;
169    
170            warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n" if $debug;
171    
172            send_nodes view => $view => $code;
173    
174            undef $out;
175    
176                  report "$affected affected records $view";          my $affected = 0;
177            $t = time;
178    
179                  if ( defined $out ) {          foreach my $pos ( $offset + 1 .. $offset + $input->size ) {
180                    my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );
181                    if ( ! $rec ) {
182                            warn "END at $pos";
183                            last;
184                    }
185    
186                    eval "$code";
187                    if ( $@ ) {
188                            warn "ABORT [$pos] $@\n";
189                            last;
190                    } else {
191                            $affected++;
192                    }
193            };
194    
195            report "$affected affected records $view";
196    
197            warn "WARN no \$out defined!" unless defined $out;
198    
199            if ( $connected ) {
200                    foreach my $node ( keys %$connected ) {
201                            warn "# $listen get_node $node\n";
202                            my $o = get_node $node;
203                            my $s = length $o;
204                            $o = thaw $o;
205                            warn "# $listen merge $s bytes\n";
206                            merge_out $o;
207                    }
208            }
209    }
210    
211    sub run_views {
212            @views = sort glob 'views/*.pl' unless @views;
213            warn "# views ", dump @views;
214    
215            foreach my $view ( @views ) {
216    
217                    next if system("perl -c $view") != 0;
218    
219                    my $code = read_file $view;
220    
221                    run_code $view => $code;
222    
223                    if ( defined $out ) {
224                          my $dump = dump $out;                          my $dump = dump $out;
225                          my $len  = length $dump;                          my $len  = length $dump;
226                          my $path = "out/$nr.$package";  
227                          print "out $view $offset/$limit $len bytes $path"                          my $path = $view;
228                            $path =~ s{views?/}{out/} || die "no view in $view";
229                            $path =~ s{\.pl}{};
230    
231                            print "OUT $view $offset/$limit $len bytes $path"
232                                  , ( $len < 10000 ?  " \$out = $dump" : ' SAVED ONLY' )                                  , ( $len < 10000 ?  " \$out = $dump" : ' SAVED ONLY' )
233                                  , "\n"                                  , "\n"
234                                  ;                                  ;
# Line 96  sub run_views { Line 236  sub run_views {
236                          unlink "$path.last" if -e "$path.last";                          unlink "$path.last" if -e "$path.last";
237                          rename $path, "$path.last";                          rename $path, "$path.last";
238                          write_file $path, $dump;                          write_file $path, $dump;
239                          report "save $path";                          report "SAVE $path";
240                    }
241    
242            }
243    
244    }
245    
246    if ( $listen ) {
247            my $sock = IO::Socket::INET->new(
248                    Listen    => SOMAXCONN,
249                    LocalAddr => '127.0.0.1',
250                    LocalPort => $listen,
251                    Proto     => 'tcp',
252                    Reuse     => 1,
253            ) or die $!;
254    
255            while (1) {
256    
257                    warn "NODE $listen ready - wating for connection\n";
258    
259                    my $client = $sock->accept();
260    
261                    warn "<<<< $listen connect from ", $client->peerhost, $/;
262    
263                    my @header = split(/\s/, <$client>);
264                    warn "<<<< $listen header ",dump(@header),$/;
265    
266                    my $size = shift @header;
267    
268                    my $content;
269                    read $client, $content, $size;
270    
271                    if ( $header[0] eq 'view' ) {
272                            run_code $header[1] => $content;
273                            send_sock $client => freeze $out;
274                    } elsif ( $header[0] eq 'info' ) {
275                            my $info = "$listen\t$offset\t$limit\t$path";
276                            $info .= "\t" . eval $header[1] if $header[1];
277                            warn "info $info\n";
278                            send_sock $client => $info;
279                    } elsif ( $header[0] eq 'exit' ) {
280                            warn "exit $listen";
281                            exit;
282                  } else {                  } else {
283                          warn "W: no \$out defined!";                          warn "WARN $listen unknown";
284                  }                  }
285    
286          }          }
# Line 109  run_views; Line 291  run_views;
291  while ( 1 ) {  while ( 1 ) {
292    
293          print "sack> ";          print "sack> ";
294          my $cmd = <STDIN>;          chomp( my $cmd = <STDIN> );
295    
296            if ( $cmd =~ m{^(h|\?)} ) {
297                    print << "__HELP__"
298    Sacks Lorry v$VERSION - path: $path offset: $offset limit: $limit
299    
300            View Run        run views
301            VI \\e Output   show output of last run
302            Info [\$VERSION]        instrospect
303            Quit EXit       shutdown
304    
305          if ( $cmd =~ m{(vi|\\e|out)}i ) {  __HELP__
306            } elsif ( $cmd =~ m{^(vi|\\e|o)}i ) {
307                  system "vi out/*";                  system "vi out/*";
308          } else {          } elsif ( $cmd =~ m{^i(?:nfo)?\s?(.+)?$}i ) {
309                    print "# nodes: ", join(' ',@nodes), $/;
310    
311                    send_nodes 'info' => $2;
312    
313                    my @info = (
314                            "node\toffset\tlimit\tpath",
315                            "----\t------\t-----\t----",
316                            "0\t$offset\t$limit\t$path",
317                    );
318    
319                    push @info, get_node $_ foreach @nodes;
320    
321                    print "$_\n" foreach @info;
322    
323            } elsif ( $cmd =~ m{^(q|e|x)}i ) {
324                    warn "# exit";
325                    send_nodes 'exit';
326                    exit;
327            } elsif ( $cmd =~ m{^(v|r)}i ) {
328                  run_views;                  run_views;
329            } elsif ( $cmd ) {
330                    warn "UNKNOWN ", dump $cmd;
331          }          }
332    
333  }  }

Legend:
Removed from v.9  
changed lines
  Added in v.24

  ViewVC Help
Powered by ViewVC 1.1.26