/[Sack]/trunk/bin/sack.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/bin/sack.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1 by dpavlin, Sun Sep 20 18:42:07 2009 UTC revision 20 by dpavlin, Tue Sep 22 16:02:49 2009 UTC
# Line 6  use strict; Line 6  use strict;
6  use Time::HiRes qw(time);  use Time::HiRes qw(time);
7  use Data::Dump qw(dump);  use Data::Dump qw(dump);
8  use File::Slurp;  use File::Slurp;
9    use Getopt::Long;
10    use IO::Socket::INET;
11    use Storable qw/freeze thaw/;
12    
13    
14    my $debug  = 0;
15    my $path   = '/data/isi/full.txt';
16    my $limit  = 5000;
17    my $offset = 0;
18    my @views;
19    my $listen;
20    my @nodes;
21    
22    
23    GetOptions(
24            'path=s'   => \$path,
25            'offset=i' => \$offset,
26            'limit=i'  => \$limit,
27            'view=s'   => \@views,
28            'listen|port=i' => \$listen,
29            'connect=s'   => \@nodes,
30            'debug!'   => \$debug,
31    ) or die $!;
32    
33  my $t = time;  my $t = time;
34    
35    
36  use lib '/srv/webpac2/lib/';  our $prefix;
37    BEGIN {
38            $prefix = $0;
39            if ( $prefix =~ s{^./}{} ) {
40                    chomp( my $pwd = `pwd` );
41                    $prefix = "$pwd/$prefix";
42            }
43            $prefix =~ s{^(.*)/srv/Sack/bin.+$}{$1};
44            warn "# prefix $prefix";
45    }
46    
47    
48    use lib "$prefix/srv/webpac2/lib/";
49  use WebPAC::Input::ISI;  use WebPAC::Input::ISI;
50  my $input = WebPAC::Input::ISI->new(  my $input = WebPAC::Input::ISI->new(
51          path  => '/data/isi/full.txt',          path   => "$prefix/$path",
52          limit => shift @ARGV || 1000,          offset => $offset,
53            limit  => $limit,
54  );  );
55    
56    
# Line 22  sub report { Line 58  sub report {
58          my $description = shift;          my $description = shift;
59          my $dt = time - $t;          my $dt = time - $t;
60          printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;          printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;
61            $t = time;
62  }  }
63    
64    
# Line 29  report $input->size . ' records loaded'; Line 66  report $input->size . ' records loaded';
66    
67  mkdir 'out' unless -e 'out';  mkdir 'out' unless -e 'out';
68    
69  sub run_views {  our $out;
         my @views = glob 'views/*.pl';  
         warn "# views ", dump @views;  
70    
71          foreach my $view ( @views ) {  our $cache;
72    
73                  my ( $nr, $package ) = ( $1, $2 )  our $connected;
                         if $view =~ m{/(\d+)\.([^/]+(\.pl)?$)};  
74    
75                  my $out;  sub send_nodes {
76            my $content = $#_ > 0 ? pop @_ : '';    # no content with just one argument!
77            my $header = length($content);
78            $header .= ' ' . join(' ', @_) if @_;
79    
80                  next if system("perl -c $view") != 0;          foreach my $node ( @nodes ) {
81    
82                  my $code = read_file $view;                  my $sock = IO::Socket::INET->new(
83                  warn $code;                          PeerAddr => $node,
84                            Proto    => 'tcp',
85                    );
86    
87                    if ( ! $sock ) {
88                            warn "can't connect to $node - $!"; # FIXME die?
89                            next;
90                    }
91    
92                  $t = time;                  print ">>>> $node $header\n";
93                    print $sock "$header\n$content" || warn "can't send $header to $node: $!";
94    
95                  foreach my $pos ( 1 .. $input->size ) {                  $connected->{$node} = $sock;
96                          my $rec = $input->fetch_rec($pos);                }
97    }
98    
99                          eval $code;  sub get_node {
100                          die "ERROR [$pos] $@" if $@;          my $node = shift;
                 };  
101    
102                  report $view;          my $sock = $connected->{$node};
103            if ( ! $sock ) {
104                    warn "ERROR: lost connection to $node";
105                    delete $connected->{$node};
106                    return;
107            }
108            chomp( my $size = <$sock> );
109            warn "<<<< $node $size bytes\n";
110            my $data;
111            read $sock, $data, $size;
112            return $data;
113    }
114    
115                  if ( defined $out ) {  sub send_sock {
116            my ( $sock, $data ) = @_;
117            my $size   = length $data;
118            warn ">>>> ", $sock->peerhost, " $size bytes";
119            print $sock "$size\n$data" || warn "can't send $size bytes to ", $sock->peerhost;
120    }
121    
122    sub merge_out {
123            my $new = shift;
124    
125            foreach my $k1 ( keys %$new ) {
126    
127                    foreach my $k2 ( keys %{ $new->{$k1} } ) {
128    
129                            my $n   =     $new->{$k1}->{$k2};
130                            my $ref = ref $out->{$k1}->{$k2};
131    
132                            if ( ! defined $out->{$k1}->{$k2} ) {
133                                    $out->{$k1}->{$k2} = $n;
134                            } elsif ( $k1 =~ m{\+} ) {
135    #                               warn "## agregate $k1 $k2";
136                                    $out->{$k1}->{$k2} += $n;
137                            } elsif ( $ref eq 'ARRAY' ) {
138                                    push @{ $out->{$k1}->{$k2} }, $n;
139                            } elsif ( $ref eq '' ) {
140                                    $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
141                            } else {
142                                    die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
143                            }
144                    }
145            }
146    
147            warn "## merge out ", dump $out if $debug;
148    }
149    
150    sub run_code {
151            my ( $view, $code ) = @_;
152    
153            warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n";
154    
155            send_nodes view => $view => $code;
156    
157            undef $out;
158    
159            my $affected = 0;
160            $t = time;
161    
162            foreach my $pos ( $offset + 1 .. $offset + $input->size ) {
163                    my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );
164                    if ( ! $rec ) {
165                            warn "END at $pos";
166                            last;
167                    }
168    
169                    eval "$code";
170                    if ( $@ ) {
171                            warn "ABORT [$pos] $@\n";
172                            last;
173                    } else {
174                            $affected++;
175                    }
176            };
177    
178            report "$affected affected records $view";
179    
180            warn "WARN no \$out defined!" unless defined $out;
181    
182            if ( $connected ) {
183                    warn "# get results from ", join(' ', keys %$connected );
184                    merge_out( thaw( get_node( $_ ) ) ) foreach keys %$connected;
185            }
186    }
187    
188    sub run_views {
189            @views = sort glob 'views/*.pl' unless @views;
190            warn "# views ", dump @views;
191    
192            foreach my $view ( @views ) {
193    
194                    next if system("perl -c $view") != 0;
195    
196                    my $code = read_file $view;
197    
198                    run_code $view => $code;
199    
200                    if ( defined $out ) {
201                          my $dump = dump $out;                          my $dump = dump $out;
202                          my $len  = length $dump;                          my $len  = length $dump;
203                          my $path = "out/$nr.$package";  
204                          print "# $view [$len] $path"                          my $path = $view;
205                                  , $len < 10000 ?  " \$out = $dump" : ''                          $path =~ s{views?/}{out/} || die "no view in $view";
206                            $path =~ s{\.pl}{};
207    
208                            print "OUT $view $offset/$limit $len bytes $path"
209                                    , ( $len < 10000 ?  " \$out = $dump" : ' SAVED ONLY' )
210                                  , "\n"                                  , "\n"
211                                  ;                                  ;
212    
213                            unlink "$path.last" if -e "$path.last";
214                            rename $path, "$path.last";
215                          write_file $path, $dump;                          write_file $path, $dump;
216                          report "save $path";                          report "SAVE $path";
217                  }                  }
218    
219          }          }
220    
221  }  }
222    
223    if ( $listen ) {
224            my $sock = IO::Socket::INET->new(
225                    Listen    => SOMAXCONN,
226    #               LocalAddr => '0.0.0.0',
227                    LocalPort => $listen,
228                    Proto     => 'tcp',
229                    Reuse     => 1,
230            ) or die $!;
231    
232            while (1) {
233    
234                    warn "NODE listen on $listen\n";
235    
236                    my $client = $sock->accept();
237    
238                    warn "<<<< $listen connect from ", $client->peerhost, $/;
239    
240                    my @header = split(/\s/, <$client>);
241                    warn "# header ",dump @header;
242    
243                    my $size = shift @header;
244    
245                    my $content;
246                    read $client, $content, $size;
247    
248                    if ( $header[0] eq 'view' ) {
249                            run_code $header[1] => $content;
250                            send_sock $client => freeze $out;
251                    } elsif ( $header[0] eq 'info' ) {
252                            my $info = "$listen\t$offset\t$limit\t$path";
253                            warn "info $info\n";
254                            send_sock $client => $info;
255                    } elsif ( $header[0] eq 'exit' ) {
256                            warn "exit $listen";
257                            exit;
258                    } else {
259                            warn "WARN $listen unknown";
260                    }
261    
262            }
263    }
264    
265    run_views;
266    
267  while ( 1 ) {  while ( 1 ) {
         run_views;  
268    
269          print "sack> ";          print "sack> ";
270          my $cmd = <STDIN>;          my $cmd = <STDIN>;
271    
272            if ( $cmd =~ m{^(vi?|\\e|o(?:ut)?)}i ) {
273                    system "vi out/*";
274            } elsif ( $cmd =~ m{^i(nfo)?}i ) {
275                    print "# nodes: ", join(' ',@nodes), $/;
276    
277                    my @info = (
278                            "node\toffset\tlimit\tpath",
279                            "----\t------\t-----\t----",
280                            "here\t$offset\t$limit\t$path",
281                    );
282    
283                    send_nodes 'info';
284                    push @info, get_node $_ foreach @nodes;
285    
286                    print "$_\n" foreach @info;
287    
288            } elsif ( $cmd =~ m{^(q(uit)|e(xit))}i ) {
289                    warn "# exit";
290                    send_nodes 'exit';
291                    exit;
292            } else {
293                    run_views;
294            }
295    
296  }  }
297    

Legend:
Removed from v.1  
changed lines
  Added in v.20

  ViewVC Help
Powered by ViewVC 1.1.26