/[Sack]/trunk/bin/sack.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/bin/sack.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 10 by dpavlin, Mon Sep 21 20:32:51 2009 UTC revision 26 by dpavlin, Tue Sep 22 21:45:04 2009 UTC
# Line 3  Line 3 
3  use warnings;  use warnings;
4  use strict;  use strict;
5    
6    our $VERSION = '0.01';
7    
8  use Time::HiRes qw(time);  use Time::HiRes qw(time);
9  use Data::Dump qw(dump);  use Data::Dump qw(dump);
10  use File::Slurp;  use File::Slurp;
11  use Getopt::Long;  use Getopt::Long;
12    use IO::Socket::INET;
13    use Storable qw/freeze thaw/;
14    
15    
16    my $debug  = 0;
17  my $path   = '/data/isi/full.txt';  my $path   = '/data/isi/full.txt';
18  my $limit  = 10000;  my $limit  = 5000;
19  my $offset = 0;  my $offset = 0;
20  my @views;  my @views;
21    my $listen = 0; # off
22    my @nodes;
23    
24    
25  GetOptions(  GetOptions(
# Line 20  GetOptions( Line 27  GetOptions(
27          'offset=i' => \$offset,          'offset=i' => \$offset,
28          'limit=i'  => \$limit,          'limit=i'  => \$limit,
29          'view=s'   => \@views,          'view=s'   => \@views,
30            'listen|port=i' => \$listen,
31            'connect=s'   => \@nodes,
32            'debug!'   => \$debug,
33  ) or die $!;  ) or die $!;
34    
35  my $t = time;  my $t = time;
36    
37  use lib '/srv/webpac2/lib/';  
38    sub send_nodes;
39    
40    our $prefix;
41    sub BEGIN {
42            $prefix = $0;
43            if ( $prefix =~ s{^./}{} ) {
44                    chomp( my $pwd = `pwd` );
45                    $prefix = "$pwd/$prefix";
46            }
47            $prefix =~ s{^(.*)/srv/Sack/bin.+$}{$1};
48            warn "# prefix $prefix";
49    
50            $SIG{INT} = sub {
51                    my $signame = shift;
52                    send_nodes 'exit';
53                    die "SIG$signame";
54            };
55    }
56    
57    
58    use lib "$prefix/srv/webpac2/lib/";
59  use WebPAC::Input::ISI;  use WebPAC::Input::ISI;
60    
61    $WebPAC::Input::ISI::subfields = undef; # disable parsing of subfields
62    
63  my $input = WebPAC::Input::ISI->new(  my $input = WebPAC::Input::ISI->new(
64          path   => $path,          path   => "$prefix/$path",
65          offset => $offset,          offset => $offset,
66          limit  => $limit,          limit  => $limit,
67  );  );
68    
69    our $num_records = $input->size;
70    
71  sub report {  sub report {
72          my $description = shift;          my $description = shift;
# Line 49  our $out; Line 84  our $out;
84    
85  our $cache;  our $cache;
86    
87  sub run_views {  our $connected;
         @views = sort glob 'views/*.pl' unless @views;  
         warn "# views ", dump @views;  
88    
89          foreach my $view ( @views ) {  sub send_nodes {
90            my $content = $#_ > 0 ? pop @_ : '';    # no content with just one argument!
91            my $header = defined $content ? length($content) : 0;
92            $header .= ' ' . join(' ', @_) if @_;
93    
94            foreach my $node ( @nodes ) {
95    
96                    my $sock = IO::Socket::INET->new(
97                            PeerAddr => $node,
98                            Proto    => 'tcp',
99                    );
100    
101                    if ( ! $sock ) {
102                            warn "can't connect to $node - $!"; # FIXME die?
103                            next;
104                    }
105    
106                    print ">>>> $listen $node $header\n";
107                    print $sock "$header\n$content" || warn "can't send $header to $node: $!";
108    
109                    $connected->{$node} = $sock;
110            }
111    }
112    
113                  my ( $nr, $package ) = ( $1, $2 )  sub get_node {
114                          if $view =~ m{/(\d+)\.([^/]+(\.pl)?$)};          my $node = shift;
115    
116                  undef $out;          my $sock = $connected->{$node};
117            if ( ! $sock ) {
118                    warn "ERROR: lost connection to $node";
119                    delete $connected->{$node};
120                    return;
121            }
122            chomp( my $size = <$sock> );
123            warn "<<<< $listen $node $size bytes\n";
124            my $data;
125            read $sock, $data, $size;
126            return $data;
127    }
128    
129                  next if system("perl -c $view") != 0;  sub send_sock {
130            my ( $sock, $data ) = @_;
131            my $size   = length $data;
132            warn ">>>> $listen ", $sock->peerhost, " $size bytes\n";
133            print $sock "$size\n$data" || warn "can't send $size bytes to ", $sock->peerhost;
134    }
135    
136                  my $code = read_file $view;  sub merge_out {
137                  warn "## CODE\n$code\n## CODE\n";          my $new = shift;
138    
139                  my $affected = 0;          foreach my $k1 ( keys %$new ) {
                 $t = time;  
140    
141                  foreach my $pos ( $offset + 1 .. $offset + $input->size ) {                  foreach my $k2 ( keys %{ $new->{$k1} } ) {
                         my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );  
                         if ( ! $rec ) {  
                                 warn "END at $pos";  
                                 last;  
                         }  
142    
143                          eval "$code";                          my $n   =     $new->{$k1}->{$k2};
144                          if ( $@ ) {                          my $ref = ref $out->{$k1}->{$k2};
145                                  warn "ERROR [$pos] $@\n";  
146                            if ( ! defined $out->{$k1}->{$k2} ) {
147                                    $out->{$k1}->{$k2} = $n;
148                            } elsif ( $k1 =~ m{\+} ) {
149    #                               warn "## agregate $k1 $k2";
150                                    $out->{$k1}->{$k2} += $n;
151                            } elsif ( $ref  eq 'ARRAY' ) {
152                                    if ( ref $n eq 'ARRAY' ) {
153                                            push @{ $out->{$k1}->{$k2} }, $_ foreach @$n;
154                                    } else {
155                                            push @{ $out->{$k1}->{$k2} }, $n;
156                                    }
157                            } elsif ( $ref eq '' ) {
158                                    $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
159                          } else {                          } else {
160                                  $affected++;                                  die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
161                          }                          }
162                  };                  }
163            }
164    
165            warn "## merge out ", dump $out if $debug;
166    }
167    
168    sub run_code {
169            my ( $view, $code ) = @_;
170    
171                  report "$affected affected records $view";          warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n" if $debug;
172    
173            send_nodes view => $view => $code;
174    
175            undef $out;
176    
177            my $affected = 0;
178            $t = time;
179    
180                  if ( defined $out ) {          foreach my $pos ( $offset + 1 .. $offset + $input->size ) {
181                    my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );
182                    if ( ! $rec ) {
183                            warn "END at $pos";
184                            last;
185                    }
186    
187                    eval "$code";
188                    if ( $@ ) {
189                            warn "ABORT [$pos] $@\n";
190                            last;
191                    } else {
192                            $affected++;
193                    }
194            };
195    
196            report "$affected affected records $view";
197    
198            warn "WARN no \$out defined!" unless defined $out;
199    
200            if ( $connected ) {
201                    foreach my $node ( keys %$connected ) {
202                            warn "# $listen get_node $node\n";
203                            my $o = get_node $node;
204                            my $s = length $o;
205                            $o = thaw $o;
206                            warn "# $listen merge $s bytes\n";
207                            merge_out $o;
208                    }
209            }
210    }
211    
212    sub run_views {
213            @views = sort glob 'views/*.pl' unless @views;
214            warn "# views ", dump @views;
215    
216            foreach my $view ( @views ) {
217    
218                    next if system("perl -c $view") != 0;
219    
220                    my $code = read_file $view;
221    
222                    run_code $view => $code;
223    
224                    if ( defined $out ) {
225                          my $dump = dump $out;                          my $dump = dump $out;
226                          my $len  = length $dump;                          my $len  = length $dump;
227                          my $path = "out/$nr.$package";  
228                          print "out $view $offset/$limit $len bytes $path"                          my $path = $view;
229                            $path =~ s{views?/}{out/} || die "no view in $view";
230                            $path =~ s{\.pl}{};
231    
232                            print "OUT $view $offset/$limit $len bytes $path"
233                                  , ( $len < 10000 ?  " \$out = $dump" : ' SAVED ONLY' )                                  , ( $len < 10000 ?  " \$out = $dump" : ' SAVED ONLY' )
234                                  , "\n"                                  , "\n"
235                                  ;                                  ;
# Line 97  sub run_views { Line 237  sub run_views {
237                          unlink "$path.last" if -e "$path.last";                          unlink "$path.last" if -e "$path.last";
238                          rename $path, "$path.last";                          rename $path, "$path.last";
239                          write_file $path, $dump;                          write_file $path, $dump;
240                          report "save $path";                          report "SAVE $path";
241                    }
242    
243            }
244    
245    }
246    
247    if ( $listen ) {
248            my $sock = IO::Socket::INET->new(
249                    Listen    => SOMAXCONN,
250                    LocalAddr => '127.0.0.1',
251                    LocalPort => $listen,
252                    Proto     => 'tcp',
253                    Reuse     => 1,
254            ) or die $!;
255    
256            while (1) {
257    
258                    warn "NODE $listen ready - path: $path offset: $offset limit: $limit #recs: $num_records\n";
259    
260                    my $client = $sock->accept();
261    
262                    warn "<<<< $listen connect from ", $client->peerhost, $/;
263    
264                    my @header = split(/\s/, <$client>);
265                    warn "<<<< $listen header ",dump(@header),$/;
266    
267                    my $size = shift @header;
268    
269                    my $content;
270                    read $client, $content, $size;
271    
272                    if ( $header[0] eq 'view' ) {
273                            run_code $header[1] => $content;
274                            send_sock $client => freeze $out;
275                    } elsif ( $header[0] eq 'info' ) {
276                            my $info = "$listen\t$offset\t$limit\t$num_records\t$path";
277                            $info .= "\t" . eval $header[1] if $header[1];
278                            warn "info $info\n";
279                            send_sock $client => $info;
280                    } elsif ( $header[0] eq 'exit' ) {
281                            warn "exit $listen";
282                            exit;
283                  } else {                  } else {
284                          warn "W: no \$out defined!";                          warn "WARN $listen unknown";
285                  }                  }
286    
287          }          }
# Line 110  run_views; Line 292  run_views;
292  while ( 1 ) {  while ( 1 ) {
293    
294          print "sack> ";          print "sack> ";
295          my $cmd = <STDIN>;          chomp( my $cmd = <STDIN> );
296    
297            if ( $cmd =~ m{^(h|\?)} ) {
298                    print << "__HELP__"
299    Sacks Lorry v$VERSION - path: $path offset: $offset limit: $limit
300    
301            View Run        run views
302            VI \\e Output   show output of last run
303            Info [\$VERSION]        instrospect
304            Quit EXit       shutdown
305    
306          if ( $cmd =~ m{(vi|\\e|out)}i ) {  __HELP__
307            } elsif ( $cmd =~ m{^(vi|\\e|o)}i ) {
308                  system "vi out/*";                  system "vi out/*";
309          } else {          } elsif ( $cmd =~ m{^i(?:nfo)?\s?(.+)?$}i ) {
310                    print "# nodes: ", join(' ',@nodes), $/;
311    
312                    send_nodes 'info' => $2;
313    
314                    my @info = (
315                            "node\toffset\tlimit\t#recs\tpath",
316                            "----\t------\t-----\t-----\t----",
317                            "0\t$offset\t$limit\t$num_records\t$path",
318                    );
319    
320                    push @info, get_node $_ foreach @nodes;
321    
322                    print "$_\n" foreach @info;
323    
324            } elsif ( $cmd =~ m{^(q|e|x)}i ) {
325                    warn "# exit";
326                    send_nodes 'exit';
327                    exit;
328            } elsif ( $cmd =~ m{^(v|r)}i ) {
329                  run_views;                  run_views;
330            } elsif ( $cmd ) {
331                    warn "UNKNOWN ", dump $cmd;
332          }          }
333    
334  }  }

Legend:
Removed from v.10  
changed lines
  Added in v.26

  ViewVC Help
Powered by ViewVC 1.1.26