/[Sack]/trunk/lib/Sack/Lorry.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/lib/Sack/Lorry.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 97 by dpavlin, Sun Oct 4 13:09:28 2009 UTC revision 139 by dpavlin, Wed Oct 7 18:58:17 2009 UTC
# Line 3  package Sack::Lorry; Line 3  package Sack::Lorry;
3  use warnings;  use warnings;
4  use strict;  use strict;
5    
6    our $VERSION = '0.08';
7    
8  use IO::Socket::INET;  use IO::Socket::INET;
9  use Data::Dump qw(dump);  use Data::Dump qw(dump);
10  use Storable;  use Storable;
11  use File::Slurp;  use File::Slurp;
12    use Net::Ping;
13    use Time::HiRes qw(time sleep);
14    
15    use lib 'lib';
16    use base qw(Sack::Pid);
17    use Sack;
18    
19  our $pids;  our $pids;
 our $ports;  
20    
21  $SIG{CHLD} = 'IGNORE';  $SIG{CHLD} = 'IGNORE';
22    
 my $port = 4000;  
   
23  sub new {  sub new {
24          my $class = shift;          my $class = shift;
25          my $self  = bless {@_}, $class;          my $self  = bless {@_}, $class;
26            $self->{sock} = {};
27            warn __PACKAGE__, " $VERSION\n";
28          return $self;          return $self;
29  }  }
30    
31  sub start_node {  sub connected {
32          my ( $self, $host ) = @_;          sort keys %{ $_[0]->{sock} }
33    }
34    
35          system "rsync -rav /srv/Sack/ $host:/srv/Sack/";  sub connect_to {
36            my ( $self, $port, $retries ) = @_;
37    
38          if ( my $pid = fork ) {          $retries ||= 30;
39                  # parent          warn "# connect_to [$port] $retries times";
                 $pids->{ "$host:$port" } = $pid;  
                 $ports->{ $port } = $host;  
40    
41                  my $sock;          my $sock = $self->{sock}->{$port};
42    
43                  print STDERR "waiting for $port";          while ( ! $sock && $retries-- ) {
44    
45                    $sock = IO::Socket::INET->new(
46                            PeerAddr => '127.0.0.1',
47                            PeerPort => $port,
48                            Proto    => 'tcp',
49                    );
50    
51                    if ( ! $sock ) {
52                            print STDERR ".";
53                            sleep 0.5;
54                    } elsif ( $sock->connected ) {
55                            $self->{sock}->{$port} = $sock;
56                            warn "# connected to $port\n";
57    
58                            $self->send_to( $port, { info => 1 } );
59                            warn "info ", dump( $self->get_from( $port ) ), $/;
60                            
61                    } else {
62                            close $sock;
63                    }
64    
65                  while ( ! $sock ) {          }
66    
67                          $sock = IO::Socket::INET->new(          if ( ! $retries ) {
68                                  PeerAddr => '127.0.0.1',                  warn "SKIP $port: $!";
69                                  PeerPort => $port,                  return;
70                                  Proto    => 'tcp',          } else {
71                          );                  return $port;
72            }
73    }
74    
                         if ( ! $sock ) {  
                                 print STDERR ".";  
                                 sleep 1;  
                         }  
75    
76                  }  sub start_node_port {
77            my ( $self, $host, $port ) = @_;
78    
79            chomp $host;
80    
81            my $p = Net::Ping->new;
82    
83            if ( ! $p->ping( $host ) ) {
84                    warn "can't ping [$host]\n";
85                    return;
86            }
87    
88            if ( $self->connect_to( $port, 1 ) ) {
89                    warn "re-using existing $port";
90            }
91    
92            my $ssh_config = '-F etc/lib.ssh';
93    
94            my $pid_path = "/tmp/sack.$port.pid";
95            kill 9, read_file $pid_path if -e $pid_path;
96    
97            if ( my $pid = fork ) {
98                    # parent
99    
100                  $self->{sock}->{$port} = $sock;                  $self->connect_to( $port ) || return;
101    
102                  warn "\nconnected to $port\n";                  $pids->{ $port } = $pid;
103                    $self->{port_on_host}->{$port} = $host;
104    
105                  $self->{connected}->{$port} = $host;                  warn "start_node_port $host [$port] pid $pid\n";
106    
107                  return $port++;                  return $port;
108    
109          } elsif ( ! defined $pid ) {          } elsif ( ! defined $pid ) {
110                  warn "can't fork $host $port";                  warn "can't fork $host $port";
111                  return;                  return;
112          } else {          } else {
113                  # child                  # child
114    
115                  my $cmd = $host !~ m{^(localhost|127\.)}i ? qq|                  my $cmd = $host !~ m{^(localhost|127\.)}i ? qq|
116                          ssh                          ssh -f $ssh_config
117                                  -S /tmp/sock.$port.ssh                                  -S /tmp/sock.$port.ssh
118                                  -L $port:127.0.0.1:$port                                  -L $port:127.0.0.1:$port
119                          $host                          $host
# Line 76  sub start_node { Line 125  sub start_node {
125    
126                  $cmd =~ s{\s+}{ }sg;                  $cmd =~ s{\s+}{ }sg;
127    
128                  warn "exec: $cmd\n";                  $self->port_pid( $port, $$ );
129    
130                    warn "# exec: $cmd\n";
131                  exec $cmd;                  exec $cmd;
132          }          }
133  }  }
134    
135  sub send_to {  sub send_to {
136          my ( $self, $port, $data ) = @_;          my ( $self, $port, $data ) = @_;
137          warn "send_to [$port]\n";  #       warn "send_to [$port]\n";
138          Storable::store_fd( $data => $self->{sock}->{$port} );          Storable::store_fd( $data => $self->{sock}->{$port} );
139  }  }
140    
141  sub get_from {  sub get_from {
142          my ( $self, $port )  = @_;          my ( $self, $port )  = @_;
143          warn "get_from [$port]\n";  #       warn "get_from [$port]\n";
144          Storable::fd_retrieve( $self->{sock}->{$port} );          my $data;
145            eval {
146                    $data = Storable::fd_retrieve( $self->{sock}->{$port} );
147            };
148            warn "ERROR $@" if $@;
149            return $data;
150  }  }
151    
152  sub send_to_all {  sub send_to_all {
153          my ( $self, $data ) = @_;          my ( $self, $data ) = @_;
154          $self->send_to( $_, $data ) foreach keys %{ $self->{connected} };          $self->send_to( $_, $data ) foreach $self->connected;
155  }  }
156    
157  sub get_from_all {  sub get_from_all {
158          my ( $self ) = @_;          my ( $self ) = @_;
159          my $result;          my $result;
160          $result->{$_} = $self->get_from( $_ ) foreach keys %{ $self->{connected} };          $result->{$_} = $self->get_from( $_ ) foreach $self->connected;
161          return $result;          return $result;
162  }  }
163    
164    sub restart_nodes {
165            my ( $self ) = @_;
166            foreach my $port ( $self->connected ) {
167                    warn "restart [$port]\n";
168    #               $self->send_to( $port, { restart => 1 } );
169    #               $self->connect_to( $port );
170                    $self->send_to( $port, { exit => 1 } );
171                    kill 9, $pids->{$port};
172                    $self->start_node_port( $self->{port_on_host}->{$port}, $port );
173            }
174    }
175    
176    
177  our $out;  our $out;
178    
179    use Digest::MD5 qw(md5);
180    our $nr = 0;
181    our $md5_nr;
182    our $digest_fh;
183    our @digest_offset;
184    
185  sub merge {  sub merge {
186          my ( $self, $new ) = @_;          my ( $self, $new ) = @_;
187    
# Line 123  sub merge { Line 197  sub merge {
197    
198                          my $n   = delete $new->{$k1}->{$k2};                          my $n   = delete $new->{$k1}->{$k2};
199    
200                            if ( $k1 =~ m{#} ) {
201                                    my $md5 = md5 $k2;
202                                    if ( defined $md5_nr->{$md5} ) {
203                                            $k2 = $md5_nr->{$md5};
204                                    } else {
205                                            open( $digest_fh, '>', '/tmp/sack.digest' ) unless $digest_fh;
206                                            $digest_offset[ $nr ] = tell( $digest_fh );
207                                            print $digest_fh "$k2\n";
208    
209                                            $k2 = $md5_nr->{$md5} = $nr;
210                                            $nr++;
211                                    }
212                            }
213    
214                          my $ref = ref    $out->{$k1}->{$k2};                          my $ref = ref    $out->{$k1}->{$k2};
215    
216                          if ( ! defined $out->{$k1}->{$k2} ) {                          if ( ! defined $out->{$k1}->{$k2} ) {
# Line 151  sub merge { Line 239  sub merge {
239          }          }
240    
241          $t_merge = time - $t_merge;          $t_merge = time - $t_merge;
242          warn sprintf "\nmerged %d in %.4fs %.2f/s\n", $tick, $t_merge, $t_merge / $tick ;          warn sprintf "\nmerged %d in %.4fs\n", $tick, $t_merge;
243    
244            return $tick;
245  }  }
246    
247    
248  sub view {  sub view {
249          my ( $self, $view ) = @_;          my ( $self, $view ) = @_;
250    
251            my $t = time;
252            $out = {};
253    
254          warn "run view $view ", -s $view, " bytes\n";          warn "run view $view ", -s $view, " bytes\n";
255    
256          my $view_code = read_file($view);          my $view_code = read_file($view);
257          $self->send_to_all({ view => $view_code });          $self->send_to_all({ view => $view_code });
258    
259          foreach my $port ( keys %{ $self->{connected} } ) {          my $total;
260                  warn "get_from $port\n";  
261            foreach my $port ( $self->connected ) {
262                  my $result = $self->get_from( $port );                  my $result = $self->get_from( $port );
263  warn dump $result;                  warn "# result ", dump $result if $self->{debug};
264                  if ( $result->{view} ) {                  if ( my $out = delete $result->{out} ) {
265                          $self->merge( $result->{view} );                          warn "[$port] result ", dump($result), $/ if $result;
266                            $total += $self->merge( $out );
267                  } else {                  } else {
268                          warn "no view from $port\n";                          warn "no out from $port in ",dump $result;
269                  }                  }
270          }          }
271    
272            warn sprintf "view %d in %.4fs\n", $total, time - $t;
273    
274          return $out;          return $out;
275  }  }
276    
277    sub command {
278            my ( $self, $cmd ) = @_;
279    
280            duration "repl $cmd";
281    
282            my $repl = 1;
283    
284            if ( $cmd =~ m{^v} ) {
285                    $out = $self->view( $self->{view} );
286                    duration 'view';
287            } elsif ( $cmd =~ m{^d} ) {
288                    warn dump $out;
289                    duration 'dump';
290            } elsif ( $cmd =~ m{^x} ) {
291                    $repl = 0;
292            } elsif ( $cmd =~ m{^r} ) {
293                    $self->restart_nodes;
294            } elsif ( $cmd =~ m{^i} ) {
295                    $self->send_to_all({ info => 1 });
296                    my $info = $self->get_from_all;
297                    foreach my $port ( $self->connected ) {
298                            warn "INFO view $self->{view} ", -s $self->{view}, " bytes\n";
299                            warn "[$port] $self->{port_on_host}->{$port} ", dump( $info->{$port} ), "\n";
300                    }
301            } elsif ( $cmd =~ m{^u} ) {
302                    my $updated;
303                    foreach my $host ( $self->connected ) {
304                            next if $updated->{$host}++;
305                            warn "update $host\n";
306                            system "find /srv/Sack/ | cpio --create | ssh -F etc/lib.ssh $host cpio --extract --make-directories --unconditional";
307                    }
308            } elsif ( $cmd =~ m{^sh\s+(.+)} ) {
309                    $self->send_to_all({ sh => $1 });
310                    my $sh = $self->get_from_all;
311                    foreach my $port ( $self->connected ) {
312                            warn "[$port]# $1\n$sh->{$port}->{sh}";
313                    }
314            } else {
315                    warn "UNKNOWN $cmd\n" if $cmd;
316            }
317    
318            return $repl;
319    }
320    
321    
322  sub DESTROY {  sub DESTROY {
323          warn "pids ",dump( $pids );          warn "pids ",dump( $pids );

Legend:
Removed from v.97  
changed lines
  Added in v.139

  ViewVC Help
Powered by ViewVC 1.1.26