/[Sack]/trunk/bin/sack.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/bin/sack.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 3 by dpavlin, Sun Sep 20 18:55:34 2009 UTC revision 26 by dpavlin, Tue Sep 22 21:45:04 2009 UTC
# Line 3  Line 3 
3  use warnings;  use warnings;
4  use strict;  use strict;
5    
6    our $VERSION = '0.01';
7    
8  use Time::HiRes qw(time);  use Time::HiRes qw(time);
9  use Data::Dump qw(dump);  use Data::Dump qw(dump);
10  use File::Slurp;  use File::Slurp;
11    use Getopt::Long;
12    use IO::Socket::INET;
13    use Storable qw/freeze thaw/;
14    
15    
16    my $debug  = 0;
17    my $path   = '/data/isi/full.txt';
18    my $limit  = 5000;
19    my $offset = 0;
20    my @views;
21    my $listen = 0; # off
22    my @nodes;
23    
24    
25    GetOptions(
26            'path=s'   => \$path,
27            'offset=i' => \$offset,
28            'limit=i'  => \$limit,
29            'view=s'   => \@views,
30            'listen|port=i' => \$listen,
31            'connect=s'   => \@nodes,
32            'debug!'   => \$debug,
33    ) or die $!;
34    
35  my $t = time;  my $t = time;
36    
37    
38  use lib '/srv/webpac2/lib/';  sub send_nodes;
39    
40    our $prefix;
41    sub BEGIN {
42            $prefix = $0;
43            if ( $prefix =~ s{^./}{} ) {
44                    chomp( my $pwd = `pwd` );
45                    $prefix = "$pwd/$prefix";
46            }
47            $prefix =~ s{^(.*)/srv/Sack/bin.+$}{$1};
48            warn "# prefix $prefix";
49    
50            $SIG{INT} = sub {
51                    my $signame = shift;
52                    send_nodes 'exit';
53                    die "SIG$signame";
54            };
55    }
56    
57    
58    use lib "$prefix/srv/webpac2/lib/";
59  use WebPAC::Input::ISI;  use WebPAC::Input::ISI;
60    
61    $WebPAC::Input::ISI::subfields = undef; # disable parsing of subfields
62    
63  my $input = WebPAC::Input::ISI->new(  my $input = WebPAC::Input::ISI->new(
64          path  => '/data/isi/full.txt',          path   => "$prefix/$path",
65          limit => shift @ARGV || 1000,          offset => $offset,
66            limit  => $limit,
67  );  );
68    
69    our $num_records = $input->size;
70    
71  sub report {  sub report {
72          my $description = shift;          my $description = shift;
73          my $dt = time - $t;          my $dt = time - $t;
74          printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;          printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;
75            $t = time;
76  }  }
77    
78    
# Line 29  report $input->size . ' records loaded'; Line 80  report $input->size . ' records loaded';
80    
81  mkdir 'out' unless -e 'out';  mkdir 'out' unless -e 'out';
82    
83  sub run_views {  our $out;
         my @views = glob 'views/*.pl';  
         warn "# views ", dump @views;  
84    
85          foreach my $view ( @views ) {  our $cache;
86    
87                  my ( $nr, $package ) = ( $1, $2 )  our $connected;
                         if $view =~ m{/(\d+)\.([^/]+(\.pl)?$)};  
88    
89                  my $out;  sub send_nodes {
90            my $content = $#_ > 0 ? pop @_ : '';    # no content with just one argument!
91            my $header = defined $content ? length($content) : 0;
92            $header .= ' ' . join(' ', @_) if @_;
93    
94                  next if system("perl -c $view") != 0;          foreach my $node ( @nodes ) {
95    
96                  my $code = read_file $view;                  my $sock = IO::Socket::INET->new(
97                  warn $code;                          PeerAddr => $node,
98                            Proto    => 'tcp',
99                    );
100    
101                    if ( ! $sock ) {
102                            warn "can't connect to $node - $!"; # FIXME die?
103                            next;
104                    }
105    
106                    print ">>>> $listen $node $header\n";
107                    print $sock "$header\n$content" || warn "can't send $header to $node: $!";
108    
109                    $connected->{$node} = $sock;
110            }
111    }
112    
113    sub get_node {
114            my $node = shift;
115    
116            my $sock = $connected->{$node};
117            if ( ! $sock ) {
118                    warn "ERROR: lost connection to $node";
119                    delete $connected->{$node};
120                    return;
121            }
122            chomp( my $size = <$sock> );
123            warn "<<<< $listen $node $size bytes\n";
124            my $data;
125            read $sock, $data, $size;
126            return $data;
127    }
128    
129    sub send_sock {
130            my ( $sock, $data ) = @_;
131            my $size   = length $data;
132            warn ">>>> $listen ", $sock->peerhost, " $size bytes\n";
133            print $sock "$size\n$data" || warn "can't send $size bytes to ", $sock->peerhost;
134    }
135    
136    sub merge_out {
137            my $new = shift;
138    
139            foreach my $k1 ( keys %$new ) {
140    
141                    foreach my $k2 ( keys %{ $new->{$k1} } ) {
142    
143                            my $n   =     $new->{$k1}->{$k2};
144                            my $ref = ref $out->{$k1}->{$k2};
145    
146                            if ( ! defined $out->{$k1}->{$k2} ) {
147                                    $out->{$k1}->{$k2} = $n;
148                            } elsif ( $k1 =~ m{\+} ) {
149    #                               warn "## agregate $k1 $k2";
150                                    $out->{$k1}->{$k2} += $n;
151                            } elsif ( $ref  eq 'ARRAY' ) {
152                                    if ( ref $n eq 'ARRAY' ) {
153                                            push @{ $out->{$k1}->{$k2} }, $_ foreach @$n;
154                                    } else {
155                                            push @{ $out->{$k1}->{$k2} }, $n;
156                                    }
157                            } elsif ( $ref eq '' ) {
158                                    $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
159                            } else {
160                                    die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
161                            }
162                    }
163            }
164    
165            warn "## merge out ", dump $out if $debug;
166    }
167    
168                  $t = time;  sub run_code {
169            my ( $view, $code ) = @_;
170    
171                  foreach my $pos ( 1 .. $input->size ) {          warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n" if $debug;
                         my $rec = $input->fetch_rec($pos);        
172    
173                          eval $code;          send_nodes view => $view => $code;
                         die "ERROR [$pos] $@" if $@;  
                 };  
174    
175                  report $view;          undef $out;
176    
177                  if ( defined $out ) {          my $affected = 0;
178            $t = time;
179    
180            foreach my $pos ( $offset + 1 .. $offset + $input->size ) {
181                    my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );
182                    if ( ! $rec ) {
183                            warn "END at $pos";
184                            last;
185                    }
186    
187                    eval "$code";
188                    if ( $@ ) {
189                            warn "ABORT [$pos] $@\n";
190                            last;
191                    } else {
192                            $affected++;
193                    }
194            };
195    
196            report "$affected affected records $view";
197    
198            warn "WARN no \$out defined!" unless defined $out;
199    
200            if ( $connected ) {
201                    foreach my $node ( keys %$connected ) {
202                            warn "# $listen get_node $node\n";
203                            my $o = get_node $node;
204                            my $s = length $o;
205                            $o = thaw $o;
206                            warn "# $listen merge $s bytes\n";
207                            merge_out $o;
208                    }
209            }
210    }
211    
212    sub run_views {
213            @views = sort glob 'views/*.pl' unless @views;
214            warn "# views ", dump @views;
215    
216            foreach my $view ( @views ) {
217    
218                    next if system("perl -c $view") != 0;
219    
220                    my $code = read_file $view;
221    
222                    run_code $view => $code;
223    
224                    if ( defined $out ) {
225                          my $dump = dump $out;                          my $dump = dump $out;
226                          my $len  = length $dump;                          my $len  = length $dump;
227                          my $path = "out/$nr.$package";  
228                          print "# $view $len bytes $path"                          my $path = $view;
229                                  , $len < 10000 ?  " \$out = $dump" : ' SAVED ONLY'                          $path =~ s{views?/}{out/} || die "no view in $view";
230                            $path =~ s{\.pl}{};
231    
232                            print "OUT $view $offset/$limit $len bytes $path"
233                                    , ( $len < 10000 ?  " \$out = $dump" : ' SAVED ONLY' )
234                                  , "\n"                                  , "\n"
235                                  ;                                  ;
236    
237                            unlink "$path.last" if -e "$path.last";
238                            rename $path, "$path.last";
239                          write_file $path, $dump;                          write_file $path, $dump;
240                          report "save $path";                          report "SAVE $path";
241                    }
242    
243            }
244    
245    }
246    
247    if ( $listen ) {
248            my $sock = IO::Socket::INET->new(
249                    Listen    => SOMAXCONN,
250                    LocalAddr => '127.0.0.1',
251                    LocalPort => $listen,
252                    Proto     => 'tcp',
253                    Reuse     => 1,
254            ) or die $!;
255    
256            while (1) {
257    
258                    warn "NODE $listen ready - path: $path offset: $offset limit: $limit #recs: $num_records\n";
259    
260                    my $client = $sock->accept();
261    
262                    warn "<<<< $listen connect from ", $client->peerhost, $/;
263    
264                    my @header = split(/\s/, <$client>);
265                    warn "<<<< $listen header ",dump(@header),$/;
266    
267                    my $size = shift @header;
268    
269                    my $content;
270                    read $client, $content, $size;
271    
272                    if ( $header[0] eq 'view' ) {
273                            run_code $header[1] => $content;
274                            send_sock $client => freeze $out;
275                    } elsif ( $header[0] eq 'info' ) {
276                            my $info = "$listen\t$offset\t$limit\t$num_records\t$path";
277                            $info .= "\t" . eval $header[1] if $header[1];
278                            warn "info $info\n";
279                            send_sock $client => $info;
280                    } elsif ( $header[0] eq 'exit' ) {
281                            warn "exit $listen";
282                            exit;
283                    } else {
284                            warn "WARN $listen unknown";
285                  }                  }
286    
287          }          }
# Line 76  run_views; Line 292  run_views;
292  while ( 1 ) {  while ( 1 ) {
293    
294          print "sack> ";          print "sack> ";
295          my $cmd = <STDIN>;          chomp( my $cmd = <STDIN> );
296    
297            if ( $cmd =~ m{^(h|\?)} ) {
298                    print << "__HELP__"
299    Sacks Lorry v$VERSION - path: $path offset: $offset limit: $limit
300    
301            View Run        run views
302            VI \\e Output   show output of last run
303            Info [\$VERSION]        instrospect
304            Quit EXit       shutdown
305    
306          if ( $cmd =~ m{(vi|\\e|out)}i ) {  __HELP__
307            } elsif ( $cmd =~ m{^(vi|\\e|o)}i ) {
308                  system "vi out/*";                  system "vi out/*";
309          } else {          } elsif ( $cmd =~ m{^i(?:nfo)?\s?(.+)?$}i ) {
310                    print "# nodes: ", join(' ',@nodes), $/;
311    
312                    send_nodes 'info' => $2;
313    
314                    my @info = (
315                            "node\toffset\tlimit\t#recs\tpath",
316                            "----\t------\t-----\t-----\t----",
317                            "0\t$offset\t$limit\t$num_records\t$path",
318                    );
319    
320                    push @info, get_node $_ foreach @nodes;
321    
322                    print "$_\n" foreach @info;
323    
324            } elsif ( $cmd =~ m{^(q|e|x)}i ) {
325                    warn "# exit";
326                    send_nodes 'exit';
327                    exit;
328            } elsif ( $cmd =~ m{^(v|r)}i ) {
329                  run_views;                  run_views;
330            } elsif ( $cmd ) {
331                    warn "UNKNOWN ", dump $cmd;
332          }          }
333    
334  }  }

Legend:
Removed from v.3  
changed lines
  Added in v.26

  ViewVC Help
Powered by ViewVC 1.1.26