/[Sack]/trunk/bin/sack.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/bin/sack.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 3 by dpavlin, Sun Sep 20 18:55:34 2009 UTC revision 28 by dpavlin, Tue Sep 22 22:28:40 2009 UTC
# Line 3  Line 3 
3  use warnings;  use warnings;
4  use strict;  use strict;
5    
6    our $VERSION = '0.01';
7    
8  use Time::HiRes qw(time);  use Time::HiRes qw(time);
9  use Data::Dump qw(dump);  use Data::Dump qw(dump);
10  use File::Slurp;  use File::Slurp;
11    use Getopt::Long;
12    use IO::Socket::INET;
13    use Storable qw/freeze thaw/;
14    use Digest::MD5 qw/md5/;
15    
16    
17    my $debug  = 0;
18    my $path   = '/data/isi/full.txt';
19    my $limit  = 5000;
20    my $offset = 0;
21    my @views;
22    my $listen = 0; # off
23    my @nodes;
24    
25    
26    GetOptions(
27            'path=s'   => \$path,
28            'offset=i' => \$offset,
29            'limit=i'  => \$limit,
30            'view=s'   => \@views,
31            'listen|port=i' => \$listen,
32            'connect=s'   => \@nodes,
33            'debug!'   => \$debug,
34    ) or die $!;
35    
36  my $t = time;  my $t = time;
37    
38    
39  use lib '/srv/webpac2/lib/';  sub send_nodes;
40    
41    our $prefix;
42    sub BEGIN {
43            $prefix = $0;
44            if ( $prefix =~ s{^./}{} ) {
45                    chomp( my $pwd = `pwd` );
46                    $prefix = "$pwd/$prefix";
47            }
48            $prefix =~ s{^(.*)/srv/Sack/bin.+$}{$1};
49            warn "# prefix $prefix";
50    
51            $SIG{INT} = sub {
52                    my $signame = shift;
53                    send_nodes 'exit';
54                    die "SIG$signame";
55            };
56    }
57    
58    
59    use lib "$prefix/srv/webpac2/lib/";
60  use WebPAC::Input::ISI;  use WebPAC::Input::ISI;
61    
62    $WebPAC::Input::ISI::subfields = undef; # disable parsing of subfields
63    
64  my $input = WebPAC::Input::ISI->new(  my $input = WebPAC::Input::ISI->new(
65          path  => '/data/isi/full.txt',          path   => "$prefix/$path",
66          limit => shift @ARGV || 1000,          offset => $offset,
67            limit  => $limit,
68  );  );
69    
70    our $num_records = $input->size;
71    
72  sub report {  sub report {
73          my $description = shift;          my $description = shift;
74          my $dt = time - $t;          my $dt = time - $t;
75          printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;          printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;
76            $t = time;
77  }  }
78    
79    
# Line 29  report $input->size . ' records loaded'; Line 81  report $input->size . ' records loaded';
81    
82  mkdir 'out' unless -e 'out';  mkdir 'out' unless -e 'out';
83    
84  sub run_views {  our $out;
         my @views = glob 'views/*.pl';  
         warn "# views ", dump @views;  
85    
86          foreach my $view ( @views ) {  our $cache;
87    
88                  my ( $nr, $package ) = ( $1, $2 )  our $connected;
                         if $view =~ m{/(\d+)\.([^/]+(\.pl)?$)};  
89    
90                  my $out;  sub send_nodes {
91            my $content = $#_ > 0 ? pop @_ : '';    # no content with just one argument!
92            my $header = defined $content ? length($content) : 0;
93            $header .= ' ' . join(' ', @_) if @_;
94    
95                  next if system("perl -c $view") != 0;          foreach my $node ( @nodes ) {
96    
97                  my $code = read_file $view;                  my $sock = IO::Socket::INET->new(
98                  warn $code;                          PeerAddr => $node,
99                            Proto    => 'tcp',
100                    );
101    
102                    if ( ! $sock ) {
103                            warn "can't connect to $node - $!"; # FIXME die?
104                            next;
105                    }
106    
107                    print ">>>> $listen $node $header\n";
108                    print $sock "$header\n$content" || warn "can't send $header to $node: $!";
109    
110                    $connected->{$node} = $sock;
111            }
112    }
113    
114    sub get_node {
115            my $node = shift;
116    
117            my $sock = $connected->{$node};
118            if ( ! $sock ) {
119                    warn "ERROR: lost connection to $node";
120                    delete $connected->{$node};
121                    return;
122            }
123            chomp( my $size = <$sock> );
124            warn "<<<< $listen $node $size bytes\n";
125            my $data;
126            read $sock, $data, $size;
127            return $data;
128    }
129    
130    sub send_sock {
131            my ( $sock, $data ) = @_;
132            my $size   = length $data;
133            warn ">>>> $listen ", $sock->peerhost, " $size bytes\n";
134            print $sock "$size\n$data" || warn "can't send $size bytes to ", $sock->peerhost;
135    }
136    
137    sub merge_out {
138            my $new = shift;
139    
140            foreach my $k1 ( keys %$new ) {
141    
142                    foreach my $k2 ( keys %{ $new->{$k1} } ) {
143    
144                            my $n   =     $new->{$k1}->{$k2};
145                            my $ref = ref $out->{$k1}->{$k2};
146    
147                            if ( ! defined $out->{$k1}->{$k2} ) {
148                                    $out->{$k1}->{$k2} = $n;
149                            } elsif ( $k1 =~ m{\+} ) {
150    #                               warn "## agregate $k1 $k2";
151                                    $out->{$k1}->{$k2} += $n;
152                            } elsif ( $ref  eq 'ARRAY' ) {
153                                    if ( ref $n eq 'ARRAY' ) {
154                                            push @{ $out->{$k1}->{$k2} }, $_ foreach @$n;
155                                    } else {
156                                            push @{ $out->{$k1}->{$k2} }, $n;
157                                    }
158                            } elsif ( $ref eq '' ) {
159                                    $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
160                            } else {
161                                    die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
162                            }
163                    }
164            }
165    
166            warn "## merge out ", dump $out if $debug;
167    }
168    
169                  $t = time;  sub run_code {
170            my ( $view, $code ) = @_;
171    
172                  foreach my $pos ( 1 .. $input->size ) {          warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n" if $debug;
                         my $rec = $input->fetch_rec($pos);        
173    
174                          eval $code;          send_nodes view => $view => $code;
                         die "ERROR [$pos] $@" if $@;  
                 };  
175    
176                  report $view;          undef $out;
177    
178                  if ( defined $out ) {          my $affected = 0;
179            $t = time;
180    
181            foreach my $pos ( $offset + 1 .. $offset + $input->size ) {
182                    my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );
183                    if ( ! $rec ) {
184                            warn "END at $pos";
185                            last;
186                    }
187    
188                    eval "$code";
189                    if ( $@ ) {
190                            warn "ABORT [$pos] $@\n";
191                            last;
192                    } else {
193                            $affected++;
194                    }
195            };
196    
197            report "$affected affected records $view";
198    
199            warn "WARN no \$out defined!" unless defined $out;
200    
201            if ( $connected ) {
202                    foreach my $node ( keys %$connected ) {
203                            warn "# $listen get_node $node\n";
204                            my $o = get_node $node;
205                            my $s = length $o;
206                            $o = thaw $o;
207                            warn "# $listen merge $s bytes\n";
208                            merge_out $o;
209                    }
210            }
211    }
212    
213    sub run_views {
214            @views = sort glob 'views/*.pl' unless @views;
215            warn "# views ", dump @views;
216    
217            foreach my $view ( @views ) {
218    
219                    next if system("perl -c $view") != 0;
220    
221                    my $code = read_file $view;
222    
223                    run_code $view => $code;
224    
225                    if ( defined $out ) {
226                          my $dump = dump $out;                          my $dump = dump $out;
227                          my $len  = length $dump;                          my $len  = length $dump;
228                          my $path = "out/$nr.$package";  
229                          print "# $view $len bytes $path"                          my $path = $view;
230                                  , $len < 10000 ?  " \$out = $dump" : ' SAVED ONLY'                          $path =~ s{views?/}{out/} || die "no view in $view";
231                            $path =~ s{\.pl}{};
232    
233                            print "OUT $view $offset/$limit $len bytes $path"
234                                    , ( $len < 10000 ?  " \$out = $dump" : ' SAVED ONLY' )
235                                  , "\n"                                  , "\n"
236                                  ;                                  ;
237    
238                            unlink "$path.last" if -e "$path.last";
239                            rename $path, "$path.last";
240                          write_file $path, $dump;                          write_file $path, $dump;
241                          report "save $path";                          report "SAVE $path";
242                    }
243    
244            }
245    
246    }
247    
248    if ( $listen ) {
249            my $sock = IO::Socket::INET->new(
250                    Listen    => SOMAXCONN,
251                    LocalAddr => '127.0.0.1',
252                    LocalPort => $listen,
253                    Proto     => 'tcp',
254                    Reuse     => 1,
255            ) or die $!;
256    
257            while (1) {
258    
259                    warn "NODE $listen ready - path: $path offset: $offset limit: $limit #recs: $num_records\n";
260    
261                    my $client = $sock->accept();
262    
263                    warn "<<<< $listen connect from ", $client->peerhost, $/;
264    
265                    my @header = split(/\s/, <$client>);
266                    warn "<<<< $listen header ",dump(@header),$/;
267    
268                    my $size = shift @header;
269    
270                    my $content;
271                    read $client, $content, $size;
272    
273                    if ( $header[0] eq 'view' ) {
274                            run_code $header[1] => $content;
275                            send_sock $client => freeze $out;
276                    } elsif ( $header[0] eq 'info' ) {
277                            my $info = "$listen\t$offset\t$limit\t$num_records\t$path";
278                            $info .= "\t" . eval $header[1] if $header[1];
279                            warn "info $info\n";
280                            send_sock $client => $info;
281                    } elsif ( $header[0] eq 'exit' ) {
282                            warn "exit $listen";
283                            exit;
284                    } else {
285                            warn "WARN $listen unknown";
286                  }                  }
287    
288          }          }
# Line 76  run_views; Line 293  run_views;
293  while ( 1 ) {  while ( 1 ) {
294    
295          print "sack> ";          print "sack> ";
296          my $cmd = <STDIN>;          chomp( my $cmd = <STDIN> );
297    
298            if ( $cmd =~ m{^(h|\?)} ) {
299                    print << "__HELP__"
300    Sacks Lorry v$VERSION - path: $path offset: $offset limit: $limit
301    
302            View Run        run views
303            VI \\e Output   show output of last run
304            Info [\$VERSION]        instrospect
305            Quit EXit       shutdown
306    
307          if ( $cmd =~ m{(vi|\\e|out)}i ) {  __HELP__
308            } elsif ( $cmd =~ m{^(vi|\\e|o)}i ) {
309                  system "vi out/*";                  system "vi out/*";
310          } else {          } elsif ( $cmd =~ m{^i(?:nfo)?\s?(.+)?$}i ) {
311                    print "# nodes: ", join(' ',@nodes), $/;
312    
313                    send_nodes 'info' => $2;
314    
315                    my @info = (
316                            "node\toffset\tlimit\t#recs\tpath",
317                            "----\t------\t-----\t-----\t----",
318                            "0\t$offset\t$limit\t$num_records\t$path",
319                    );
320    
321                    push @info, get_node $_ foreach @nodes;
322    
323                    print "$_\n" foreach @info;
324    
325            } elsif ( $cmd =~ m{^(q|e|x)}i ) {
326                    warn "# exit";
327                    send_nodes 'exit';
328                    exit;
329            } elsif ( $cmd =~ m{^(v|r)}i ) {
330                  run_views;                  run_views;
331            } elsif ( $cmd ) {
332                    warn "UNKNOWN ", dump $cmd;
333          }          }
334    
335  }  }

Legend:
Removed from v.3  
changed lines
  Added in v.28

  ViewVC Help
Powered by ViewVC 1.1.26