/[Sack]/trunk/bin/sack.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/bin/sack.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1 by dpavlin, Sun Sep 20 18:42:07 2009 UTC revision 19 by dpavlin, Tue Sep 22 15:14:00 2009 UTC
# Line 6  use strict; Line 6  use strict;
6  use Time::HiRes qw(time);  use Time::HiRes qw(time);
7  use Data::Dump qw(dump);  use Data::Dump qw(dump);
8  use File::Slurp;  use File::Slurp;
9    use Getopt::Long;
10    use IO::Socket::INET;
11    use Storable qw/freeze thaw/;
12    
13    
14    my $debug  = 0;
15    my $path   = '/data/isi/full.txt';
16    my $limit  = 5000;
17    my $offset = 0;
18    my @views;
19    my $listen;
20    my @nodes;
21    
22    
23    GetOptions(
24            'path=s'   => \$path,
25            'offset=i' => \$offset,
26            'limit=i'  => \$limit,
27            'view=s'   => \@views,
28            'listen|port=i' => \$listen,
29            'connect=s'   => \@nodes,
30            'debug!'   => \$debug,
31    ) or die $!;
32    
33  my $t = time;  my $t = time;
34    
35    
36  use lib '/srv/webpac2/lib/';  our $prefix;
37    BEGIN {
38            $prefix = $0;
39            if ( $prefix =~ s{^./}{} ) {
40                    chomp( my $pwd = `pwd` );
41                    $prefix = "$pwd/$prefix";
42            }
43            $prefix =~ s{^(.*)/srv/Sack/bin.+$}{$1};
44            warn "# prefix $prefix";
45    }
46    
47    
48    use lib "$prefix/srv/webpac2/lib/";
49  use WebPAC::Input::ISI;  use WebPAC::Input::ISI;
50  my $input = WebPAC::Input::ISI->new(  my $input = WebPAC::Input::ISI->new(
51          path  => '/data/isi/full.txt',          path   => "$prefix/$path",
52          limit => shift @ARGV || 1000,          offset => $offset,
53            limit  => $limit,
54  );  );
55    
56    
# Line 22  sub report { Line 58  sub report {
58          my $description = shift;          my $description = shift;
59          my $dt = time - $t;          my $dt = time - $t;
60          printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;          printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;
61            $t = time;
62  }  }
63    
64    
# Line 29  report $input->size . ' records loaded'; Line 66  report $input->size . ' records loaded';
66    
67  mkdir 'out' unless -e 'out';  mkdir 'out' unless -e 'out';
68    
69  sub run_views {  our $out;
         my @views = glob 'views/*.pl';  
         warn "# views ", dump @views;  
70    
71          foreach my $view ( @views ) {  our $cache;
72    
73                  my ( $nr, $package ) = ( $1, $2 )  our $connected;
                         if $view =~ m{/(\d+)\.([^/]+(\.pl)?$)};  
74    
75                  my $out;  sub send_nodes {
76            my $content = $#_ > 0 ? pop @_ : '';    # no content with just one argument!
77            my $header = length($content);
78            $header .= ' ' . join(' ', @_) if @_;
79    
80                  next if system("perl -c $view") != 0;          foreach my $node ( @nodes ) {
81    
82                  my $code = read_file $view;                  my $sock = IO::Socket::INET->new(
83                  warn $code;                          PeerAddr => $node,
84                            Proto    => 'tcp',
85                    );
86    
87                    if ( ! $sock ) {
88                            warn "can't connect to $node - $!"; # FIXME die?
89                            next;
90                    }
91    
92                  $t = time;                  print ">>>> $node $header\n";
93                    print $sock "$header\n$content" || warn "can't send $header to $node: $!";
94    
95                  foreach my $pos ( 1 .. $input->size ) {                  $connected->{$node} = $sock;
96                          my $rec = $input->fetch_rec($pos);                }
97    }
98    
99                          eval $code;  sub get_node {
100                          die "ERROR [$pos] $@" if $@;          my $node = shift;
                 };  
101    
102                  report $view;          my $sock = $connected->{$node};
103            if ( ! $sock ) {
104                    warn "ERROR: lost connection to $node";
105                    delete $connected->{$node};
106                    return;
107            }
108            chomp( my $size = <$sock> );
109            warn "<<<< $node $size bytes\n";
110            my $data;
111            read $sock, $data, $size;
112            return $data;
113    }
114    
115                  if ( defined $out ) {  sub send_sock {
116            my ( $sock, $data ) = @_;
117            my $size   = length $data;
118            warn ">>>> ", $sock->peerhost, " $size bytes";
119            print $sock "$size\n$data" || warn "can't send $size bytes to ", $sock->peerhost;
120    }
121    
122    sub merge_out {
123            my $new = shift;
124    
125            foreach my $k1 ( keys %$new ) {
126    
127                    foreach my $k2 ( keys %{ $new->{$k1} } ) {
128    
129                            my $n   =     $new->{$k1}->{$k2};
130                            my $ref = ref $out->{$k1}->{$k2};
131    
132                            if ( ! defined $out->{$k1}->{$k2} ) {
133                                    $out->{$k1}->{$k2} = $n;
134                            } elsif ( $k1 =~ m{\+} ) {
135    #                               warn "## agregate $k1 $k2";
136                                    $out->{$k1}->{$k2} += $n;
137                            } elsif ( $ref eq 'ARRAY' ) {
138                                    push @{ $out->{$k1}->{$k2} }, $n;
139                            } elsif ( $ref eq '' ) {
140                                    $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
141                            } else {
142                                    die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
143                            }
144                    }
145            }
146    
147            warn "## merge out ", dump $out if $debug;
148    }
149    
150    sub run_code {
151            my ( $view, $code ) = @_;
152    
153            warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n";
154    
155            send_nodes view => $view => $code;
156    
157            undef $out;
158    
159            my $affected = 0;
160            $t = time;
161    
162            foreach my $pos ( $offset + 1 .. $offset + $input->size ) {
163                    my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );
164                    if ( ! $rec ) {
165                            warn "END at $pos";
166                            last;
167                    }
168    
169                    eval "$code";
170                    if ( $@ ) {
171                            warn "ERROR [$pos] $@\n";
172                    } else {
173                            $affected++;
174                    }
175            };
176    
177            report "$affected affected records $view";
178    
179            warn "WARN no \$out defined!" unless defined $out;
180    
181            if ( $connected ) {
182                    warn "# get results from ", join(' ', keys %$connected );
183                    merge_out( thaw( get_node( $_ ) ) ) foreach keys %$connected;
184            }
185    }
186    
187    sub run_views {
188            @views = sort glob 'views/*.pl' unless @views;
189            warn "# views ", dump @views;
190    
191            foreach my $view ( @views ) {
192    
193                    next if system("perl -c $view") != 0;
194    
195                    my $code = read_file $view;
196    
197                    run_code $view => $code;
198    
199                    if ( defined $out ) {
200                          my $dump = dump $out;                          my $dump = dump $out;
201                          my $len  = length $dump;                          my $len  = length $dump;
202                          my $path = "out/$nr.$package";  
203                          print "# $view [$len] $path"                          my $path = $view;
204                                  , $len < 10000 ?  " \$out = $dump" : ''                          $path =~ s{views?/}{out/} || die "no view in $view";
205                            $path =~ s{\.pl}{};
206    
207                            print "OUT $view $offset/$limit $len bytes $path"
208                                    , ( $len < 10000 ?  " \$out = $dump" : ' SAVED ONLY' )
209                                  , "\n"                                  , "\n"
210                                  ;                                  ;
211    
212                            unlink "$path.last" if -e "$path.last";
213                            rename $path, "$path.last";
214                          write_file $path, $dump;                          write_file $path, $dump;
215                          report "save $path";                          report "SAVE $path";
216                  }                  }
217    
218          }          }
219    
220  }  }
221    
222    if ( $listen ) {
223            my $sock = IO::Socket::INET->new(
224                    Listen    => SOMAXCONN,
225    #               LocalAddr => '0.0.0.0',
226                    LocalPort => $listen,
227                    Proto     => 'tcp',
228                    Reuse     => 1,
229            ) or die $!;
230    
231            while (1) {
232    
233                    warn "NODE listen on $listen\n";
234    
235                    my $client = $sock->accept();
236    
237                    warn "<<<< $listen connect from ", $client->peerhost, $/;
238    
239                    my @header = split(/\s/, <$client>);
240                    warn "# header ",dump @header;
241    
242                    my $size = shift @header;
243    
244                    my $content;
245                    read $client, $content, $size;
246    
247                    if ( $header[0] eq 'view' ) {
248                            run_code $header[1] => $content;
249                            send_sock $client => freeze $out;
250                    } elsif ( $header[0] eq 'info' ) {
251                            my $info = "$listen\t$offset\t$limit\t$path";
252                            warn "info $info\n";
253                            send_sock $client => $info;
254                    } elsif ( $header[0] eq 'exit' ) {
255                            warn "exit $listen";
256                            exit;
257                    } else {
258                            warn "WARN $listen unknown";
259                    }
260    
261            }
262    }
263    
264    run_views;
265    
266  while ( 1 ) {  while ( 1 ) {
         run_views;  
267    
268          print "sack> ";          print "sack> ";
269          my $cmd = <STDIN>;          my $cmd = <STDIN>;
270    
271            if ( $cmd =~ m{^(vi?|\\e|o(?:ut)?)}i ) {
272                    system "vi out/*";
273            } elsif ( $cmd =~ m{^i(nfo)?}i ) {
274                    print "# nodes: ", join(' ',@nodes), $/;
275    
276                    my @info = (
277                            "node\toffset\tlimit\tpath",
278                            "----\t------\t-----\t----",
279                            "here\t$offset\t$limit\t$path",
280                    );
281    
282                    send_nodes 'info';
283                    push @info, get_node $_ foreach @nodes;
284    
285                    print "$_\n" foreach @info;
286    
287            } elsif ( $cmd =~ m{^(q(uit)|e(xit))}i ) {
288                    warn "# exit";
289                    send_nodes 'exit';
290                    exit;
291            } else {
292                    run_views;
293            }
294    
295  }  }
296    

Legend:
Removed from v.1  
changed lines
  Added in v.19

  ViewVC Help
Powered by ViewVC 1.1.26