/[Sack]/trunk/lib/Sack/Lorry.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/lib/Sack/Lorry.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 92 by dpavlin, Sat Oct 3 21:09:51 2009 UTC revision 139 by dpavlin, Wed Oct 7 18:58:17 2009 UTC
# Line 3  package Sack::Lorry; Line 3  package Sack::Lorry;
3  use warnings;  use warnings;
4  use strict;  use strict;
5    
6    our $VERSION = '0.08';
7    
8  use IO::Socket::INET;  use IO::Socket::INET;
9  use Data::Dump qw(dump);  use Data::Dump qw(dump);
10  use Storable;  use Storable;
11    use File::Slurp;
12    use Net::Ping;
13    use Time::HiRes qw(time sleep);
14    
15    use lib 'lib';
16    use base qw(Sack::Pid);
17    use Sack;
18    
19  our $pids;  our $pids;
 our $ports;  
20    
21  $SIG{CHLD} = 'IGNORE';  $SIG{CHLD} = 'IGNORE';
22    
 my $port = 4000;  
   
23  sub new {  sub new {
24          my $class = shift;          my $class = shift;
25          my $self  = bless {@_}, $class;          my $self  = bless {@_}, $class;
26            $self->{sock} = {};
27            warn __PACKAGE__, " $VERSION\n";
28          return $self;          return $self;
29  }  }
30    
31  sub start_node {  sub connected {
32          my ( $self, $host ) = @_;          sort keys %{ $_[0]->{sock} }
33    }
34    
35          if ( my $pid = fork ) {  sub connect_to {
36                  # parent          my ( $self, $port, $retries ) = @_;
                 $pids->{ $host } = $pid;  
                 $ports->{ $port } = $host;  
37    
38                  my $sock;          $retries ||= 30;
39            warn "# connect_to [$port] $retries times";
40    
41                  print STDERR "waiting for $port";          my $sock = $self->{sock}->{$port};
42    
43                  while ( ! $sock ) {          while ( ! $sock && $retries-- ) {
44    
45                    $sock = IO::Socket::INET->new(
46                            PeerAddr => '127.0.0.1',
47                            PeerPort => $port,
48                            Proto    => 'tcp',
49                    );
50    
51                    if ( ! $sock ) {
52                            print STDERR ".";
53                            sleep 0.5;
54                    } elsif ( $sock->connected ) {
55                            $self->{sock}->{$port} = $sock;
56                            warn "# connected to $port\n";
57    
58                            $self->send_to( $port, { info => 1 } );
59                            warn "info ", dump( $self->get_from( $port ) ), $/;
60                            
61                    } else {
62                            close $sock;
63                    }
64    
65                          $sock = IO::Socket::INET->new(          }
                                 PeerAddr => '127.0.0.1',  
                                 PeerPort => $port,  
                                 Proto    => 'tcp',  
                         );  
66    
67                          if ( ! $sock ) {          if ( ! $retries ) {
68                                  print STDERR ".";                  warn "SKIP $port: $!";
69                                  sleep 1;                  return;
70                          }          } else {
71                    return $port;
72            }
73    }
74    
                 }  
75    
76                  $self->{sock}->{$port} = $sock;  sub start_node_port {
77            my ( $self, $host, $port ) = @_;
78    
79            chomp $host;
80    
81            my $p = Net::Ping->new;
82    
83            if ( ! $p->ping( $host ) ) {
84                    warn "can't ping [$host]\n";
85                    return;
86            }
87    
88            if ( $self->connect_to( $port, 1 ) ) {
89                    warn "re-using existing $port";
90            }
91    
92            my $ssh_config = '-F etc/lib.ssh';
93    
94            my $pid_path = "/tmp/sack.$port.pid";
95            kill 9, read_file $pid_path if -e $pid_path;
96    
97            if ( my $pid = fork ) {
98                    # parent
99    
100                    $self->connect_to( $port ) || return;
101    
102                  warn "\nconnected to $port\n";                  $pids->{ $port } = $pid;
103                    $self->{port_on_host}->{$port} = $host;
104    
105                  return $port++;                  warn "start_node_port $host [$port] pid $pid\n";
106    
107                    return $port;
108    
109          } elsif ( ! defined $pid ) {          } elsif ( ! defined $pid ) {
110                  warn "can't fork $host $port";                  warn "can't fork $host $port";
111                  return;                  return;
112          } else {          } else {
113                  # child                  # child
114    
115                  my $cmd = $host !~ m{^(localhost|127\.)}i ? qq|                  my $cmd = $host !~ m{^(localhost|127\.)}i ? qq|
116                          ssh                          ssh -f $ssh_config
117                                  -S /tmp/sock.$port.ssh                                  -S /tmp/sock.$port.ssh
118                                  -L $port:127.0.0.1:$port                                  -L $port:127.0.0.1:$port
119                          $host                          $host
120                  | : '';                  | : '';
121    
122                  $cmd .= qq|                  $cmd .= qq|
123                          perl -I/srv/Sack/lib -MSack::Node -e "Sack::Node->new($port)"                          /srv/Sack/bin/node.pl $port
124                  |;                  |;
125    
126                  $cmd =~ s{\s+}{ }sg;                  $cmd =~ s{\s+}{ }sg;
127    
128                  warn "exec: $cmd\n";                  $self->port_pid( $port, $$ );
129    
130                    warn "# exec: $cmd\n";
131                  exec $cmd;                  exec $cmd;
132          }          }
133  }  }
134    
135  sub send_to {  sub send_to {
136          my ( $self, $port, $data ) = @_;          my ( $self, $port, $data ) = @_;
137          warn ">>>> [$port] ", dump( keys %$data ), $/;  #       warn "send_to [$port]\n";
138          Storable::store_fd( $data => $self->{sock}->{$port} );          Storable::store_fd( $data => $self->{sock}->{$port} );
139  }  }
140    
141  sub get_from {  sub get_from {
142          my ( $self, $port )  = @_;          my ( $self, $port )  = @_;
143          warn "<<<< [$port]\n";  #       warn "get_from [$port]\n";
144          Storable::fd_retrieve( $self->{sock}->{$port} );          my $data;
145            eval {
146                    $data = Storable::fd_retrieve( $self->{sock}->{$port} );
147            };
148            warn "ERROR $@" if $@;
149            return $data;
150    }
151    
152    sub send_to_all {
153            my ( $self, $data ) = @_;
154            $self->send_to( $_, $data ) foreach $self->connected;
155  }  }
156    
157    sub get_from_all {
158            my ( $self ) = @_;
159            my $result;
160            $result->{$_} = $self->get_from( $_ ) foreach $self->connected;
161            return $result;
162    }
163    
164    sub restart_nodes {
165            my ( $self ) = @_;
166            foreach my $port ( $self->connected ) {
167                    warn "restart [$port]\n";
168    #               $self->send_to( $port, { restart => 1 } );
169    #               $self->connect_to( $port );
170                    $self->send_to( $port, { exit => 1 } );
171                    kill 9, $pids->{$port};
172                    $self->start_node_port( $self->{port_on_host}->{$port}, $port );
173            }
174    }
175    
176    
177    our $out;
178    
179    use Digest::MD5 qw(md5);
180    our $nr = 0;
181    our $md5_nr;
182    our $digest_fh;
183    our @digest_offset;
184    
185    sub merge {
186            my ( $self, $new ) = @_;
187    
188            my $t_merge = time();
189    
190            my $tick = 0;
191    
192            my $missing;
193    
194            foreach my $k1 ( keys %$new ) {
195    
196                    foreach my $k2 ( keys %{ $new->{$k1} } ) {
197    
198                            my $n   = delete $new->{$k1}->{$k2};
199    
200                            if ( $k1 =~ m{#} ) {
201                                    my $md5 = md5 $k2;
202                                    if ( defined $md5_nr->{$md5} ) {
203                                            $k2 = $md5_nr->{$md5};
204                                    } else {
205                                            open( $digest_fh, '>', '/tmp/sack.digest' ) unless $digest_fh;
206                                            $digest_offset[ $nr ] = tell( $digest_fh );
207                                            print $digest_fh "$k2\n";
208    
209                                            $k2 = $md5_nr->{$md5} = $nr;
210                                            $nr++;
211                                    }
212                            }
213    
214                            my $ref = ref    $out->{$k1}->{$k2};
215    
216                            if ( ! defined $out->{$k1}->{$k2} ) {
217                                    $out->{$k1}->{$k2} = $n;
218                            } elsif ( $k1 =~ m{\+} ) {
219    #                               warn "## agregate $k1 $k2";
220                                    $out->{$k1}->{$k2} += $n;
221                            } elsif ( $ref  eq 'ARRAY' ) {
222                                    if ( ref $n eq 'ARRAY' ) {
223                                            push @{ $out->{$k1}->{$k2} }, $_ foreach @$n;
224                                    } else {
225                                            push @{ $out->{$k1}->{$k2} }, $n;
226                                    }
227                            } elsif ( $ref eq '' ) {
228                                    $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
229                            } else {
230                                    die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
231                            }
232    
233                            if ( $tick++ % 1000 == 0 ) {
234                                    print STDERR ".";
235                            } elsif ( $tick % 10000 == 0 ) {
236                                    print STDERR $tick;
237                            }
238                    }
239            }
240    
241            $t_merge = time - $t_merge;
242            warn sprintf "\nmerged %d in %.4fs\n", $tick, $t_merge;
243    
244            return $tick;
245    }
246    
247    
248    sub view {
249            my ( $self, $view ) = @_;
250    
251            my $t = time;
252            $out = {};
253    
254            warn "run view $view ", -s $view, " bytes\n";
255    
256            my $view_code = read_file($view);
257            $self->send_to_all({ view => $view_code });
258    
259            my $total;
260    
261            foreach my $port ( $self->connected ) {
262                    my $result = $self->get_from( $port );
263                    warn "# result ", dump $result if $self->{debug};
264                    if ( my $out = delete $result->{out} ) {
265                            warn "[$port] result ", dump($result), $/ if $result;
266                            $total += $self->merge( $out );
267                    } else {
268                            warn "no out from $port in ",dump $result;
269                    }
270            }
271    
272            warn sprintf "view %d in %.4fs\n", $total, time - $t;
273    
274            return $out;
275    }
276    
277    sub command {
278            my ( $self, $cmd ) = @_;
279    
280            duration "repl $cmd";
281    
282            my $repl = 1;
283    
284            if ( $cmd =~ m{^v} ) {
285                    $out = $self->view( $self->{view} );
286                    duration 'view';
287            } elsif ( $cmd =~ m{^d} ) {
288                    warn dump $out;
289                    duration 'dump';
290            } elsif ( $cmd =~ m{^x} ) {
291                    $repl = 0;
292            } elsif ( $cmd =~ m{^r} ) {
293                    $self->restart_nodes;
294            } elsif ( $cmd =~ m{^i} ) {
295                    $self->send_to_all({ info => 1 });
296                    my $info = $self->get_from_all;
297                    foreach my $port ( $self->connected ) {
298                            warn "INFO view $self->{view} ", -s $self->{view}, " bytes\n";
299                            warn "[$port] $self->{port_on_host}->{$port} ", dump( $info->{$port} ), "\n";
300                    }
301            } elsif ( $cmd =~ m{^u} ) {
302                    my $updated;
303                    foreach my $host ( $self->connected ) {
304                            next if $updated->{$host}++;
305                            warn "update $host\n";
306                            system "find /srv/Sack/ | cpio --create | ssh -F etc/lib.ssh $host cpio --extract --make-directories --unconditional";
307                    }
308            } elsif ( $cmd =~ m{^sh\s+(.+)} ) {
309                    $self->send_to_all({ sh => $1 });
310                    my $sh = $self->get_from_all;
311                    foreach my $port ( $self->connected ) {
312                            warn "[$port]# $1\n$sh->{$port}->{sh}";
313                    }
314            } else {
315                    warn "UNKNOWN $cmd\n" if $cmd;
316            }
317    
318            return $repl;
319    }
320    
321    
322  sub DESTROY {  sub DESTROY {
323          warn "pids ",dump( $pids );          warn "pids ",dump( $pids );
324          foreach ( values %$pids ) {          foreach ( values %$pids ) {

Legend:
Removed from v.92  
changed lines
  Added in v.139

  ViewVC Help
Powered by ViewVC 1.1.26