/[Sack]/trunk/lib/Sack/Lorry.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/lib/Sack/Lorry.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 97 by dpavlin, Sun Oct 4 13:09:28 2009 UTC revision 127 by dpavlin, Wed Oct 7 16:21:33 2009 UTC
# Line 3  package Sack::Lorry; Line 3  package Sack::Lorry;
3  use warnings;  use warnings;
4  use strict;  use strict;
5    
6    our $VERSION = '0.08';
7    
8  use IO::Socket::INET;  use IO::Socket::INET;
9  use Data::Dump qw(dump);  use Data::Dump qw(dump);
10  use Storable;  use Storable;
11  use File::Slurp;  use File::Slurp;
12    use Net::Ping;
13    use Time::HiRes qw(time sleep);
14    
15    use lib 'lib';
16    use base qw(Sack::Pid);
17    
18  our $pids;  our $pids;
 our $ports;  
19    
20  $SIG{CHLD} = 'IGNORE';  $SIG{CHLD} = 'IGNORE';
21    
 my $port = 4000;  
   
22  sub new {  sub new {
23          my $class = shift;          my $class = shift;
24          my $self  = bless {@_}, $class;          my $self  = bless {@_}, $class;
25            $self->{sock} = {};
26            warn __PACKAGE__, " $VERSION\n";
27          return $self;          return $self;
28  }  }
29    
30  sub start_node {  sub connected {
31          my ( $self, $host ) = @_;          sort keys %{ $_[0]->{sock} }
32    }
         system "rsync -rav /srv/Sack/ $host:/srv/Sack/";  
33    
34          if ( my $pid = fork ) {  sub connect_to {
35                  # parent          my ( $self, $port ) = @_;
                 $pids->{ "$host:$port" } = $pid;  
                 $ports->{ $port } = $host;  
36    
37                  my $sock;          my $sock;
38    
39                  print STDERR "waiting for $port";          warn "# waiting for [$port]";
40    
41                  while ( ! $sock ) {          my $retries = 30;
42    
43                          $sock = IO::Socket::INET->new(          while ( ! $sock && $retries-- ) {
                                 PeerAddr => '127.0.0.1',  
                                 PeerPort => $port,  
                                 Proto    => 'tcp',  
                         );  
44    
45                          if ( ! $sock ) {                  $sock = IO::Socket::INET->new(
46                                  print STDERR ".";                          PeerAddr => '127.0.0.1',
47                                  sleep 1;                          PeerPort => $port,
48                          }                          Proto    => 'tcp',
49                    );
50    
51                    if ( ! $sock ) {
52                            print STDERR ".";
53                            sleep 0.5;
54                    } else {
55                            undef $sock unless $sock->connected;
56                  }                  }
57    
58                  $self->{sock}->{$port} = $sock;          }
59    
60            if ( ! $retries ) {
61                    warn "SKIP $port: $!";
62                    return;
63            }
64    
65            $self->{sock}->{$port} = $sock;
66    
67            $self->send_to( $port, { info => 1 } );
68            warn "info ", dump( $self->get_from( $port ) ), $/;
69    
70            warn "# connected to $port\n";
71    
72            return $port;
73    }
74    
75    
76    sub start_node_port {
77            my ( $self, $host, $port ) = @_;
78    
79                  warn "\nconnected to $port\n";          chomp $host;
80    
81                  $self->{connected}->{$port} = $host;          my $p = Net::Ping->new;
82    
83            if ( ! $p->ping( $host ) ) {
84                    warn "can't ping [$host]\n";
85                    return;
86            }
87    
88            my $ssh_config = '-F etc/lib.ssh';
89    
90            my $pid_path = "/tmp/sack.$port.pid";
91            kill 9, read_file $pid_path if -e $pid_path;
92    
93            if ( my $pid = fork ) {
94                    # parent
95    
96                    $self->connect_to( $port ) || return;
97    
98                    $pids->{ $port } = $pid;
99                    $self->{port_on_host}->{$port} = $host;
100    
101                    warn "start_node_port $host [$port] pid $pid\n";
102    
103                  return $port++;                  return $port++;
104    
# Line 63  sub start_node { Line 107  sub start_node {
107                  return;                  return;
108          } else {          } else {
109                  # child                  # child
110    
111                  my $cmd = $host !~ m{^(localhost|127\.)}i ? qq|                  my $cmd = $host !~ m{^(localhost|127\.)}i ? qq|
112                          ssh                          ssh -f $ssh_config
113                                  -S /tmp/sock.$port.ssh                                  -S /tmp/sock.$port.ssh
114                                  -L $port:127.0.0.1:$port                                  -L $port:127.0.0.1:$port
115                          $host                          $host
# Line 76  sub start_node { Line 121  sub start_node {
121    
122                  $cmd =~ s{\s+}{ }sg;                  $cmd =~ s{\s+}{ }sg;
123    
124                  warn "exec: $cmd\n";                  $self->port_pid( $port, $$ );
125    
126                    warn "# exec: $cmd\n";
127                  exec $cmd;                  exec $cmd;
128          }          }
129  }  }
130    
131  sub send_to {  sub send_to {
132          my ( $self, $port, $data ) = @_;          my ( $self, $port, $data ) = @_;
133          warn "send_to [$port]\n";  #       warn "send_to [$port]\n";
134          Storable::store_fd( $data => $self->{sock}->{$port} );          Storable::store_fd( $data => $self->{sock}->{$port} );
135  }  }
136    
137  sub get_from {  sub get_from {
138          my ( $self, $port )  = @_;          my ( $self, $port )  = @_;
139          warn "get_from [$port]\n";  #       warn "get_from [$port]\n";
140          Storable::fd_retrieve( $self->{sock}->{$port} );          my $data;
141            eval {
142                    $data = Storable::fd_retrieve( $self->{sock}->{$port} );
143            };
144            warn "ERROR $@" if $@;
145            return $data;
146  }  }
147    
148  sub send_to_all {  sub send_to_all {
149          my ( $self, $data ) = @_;          my ( $self, $data ) = @_;
150          $self->send_to( $_, $data ) foreach keys %{ $self->{connected} };          $self->send_to( $_, $data ) foreach $self->connected;
151  }  }
152    
153  sub get_from_all {  sub get_from_all {
154          my ( $self ) = @_;          my ( $self ) = @_;
155          my $result;          my $result;
156          $result->{$_} = $self->get_from( $_ ) foreach keys %{ $self->{connected} };          $result->{$_} = $self->get_from( $_ ) foreach $self->connected;
157          return $result;          return $result;
158  }  }
159    
160    sub restart_nodes {
161            my ( $self ) = @_;
162            foreach my $port ( $self->connected ) {
163                    warn "restart [$port]\n";
164    #               $self->send_to( $port, { restart => 1 } );
165    #               $self->connect_to( $port );
166                    $self->send_to( $port, { exit => 1 } );
167                    kill 9, $pids->{$port};
168                    $self->start_node_port( $self->{port_on_host}->{$port}, $port );
169            }
170    }
171    
172    
173  our $out;  our $out;
174    
175    use Digest::MD5 qw(md5);
176    our $nr = 0;
177    our $md5_nr;
178    our $digest_fh;
179    our @digest_offset;
180    
181  sub merge {  sub merge {
182          my ( $self, $new ) = @_;          my ( $self, $new ) = @_;
183    
# Line 123  sub merge { Line 193  sub merge {
193    
194                          my $n   = delete $new->{$k1}->{$k2};                          my $n   = delete $new->{$k1}->{$k2};
195    
196                            if ( $k1 =~ m{#} ) {
197                                    my $md5 = md5 $k2;
198                                    if ( defined $md5_nr->{$md5} ) {
199                                            $k2 = $md5_nr->{$md5};
200                                    } else {
201                                            open( $digest_fh, '>', '/tmp/sack.digest' ) unless $digest_fh;
202                                            $digest_offset[ $nr ] = tell( $digest_fh );
203                                            print $digest_fh "$k2\n";
204    
205                                            $k2 = $md5_nr->{$md5} = $nr;
206                                            $nr++;
207                                    }
208                            }
209    
210                          my $ref = ref    $out->{$k1}->{$k2};                          my $ref = ref    $out->{$k1}->{$k2};
211    
212                          if ( ! defined $out->{$k1}->{$k2} ) {                          if ( ! defined $out->{$k1}->{$k2} ) {
# Line 151  sub merge { Line 235  sub merge {
235          }          }
236    
237          $t_merge = time - $t_merge;          $t_merge = time - $t_merge;
238          warn sprintf "\nmerged %d in %.4fs %.2f/s\n", $tick, $t_merge, $t_merge / $tick ;          warn sprintf "\nmerged %d in %.4fs\n", $tick, $t_merge;
239    
240            return $tick;
241  }  }
242    
243    
244  sub view {  sub view {
245          my ( $self, $view ) = @_;          my ( $self, $view ) = @_;
246    
247            my $t = time;
248            $out = {};
249    
250          warn "run view $view ", -s $view, " bytes\n";          warn "run view $view ", -s $view, " bytes\n";
251    
252          my $view_code = read_file($view);          my $view_code = read_file($view);
253          $self->send_to_all({ view => $view_code });          $self->send_to_all({ view => $view_code });
254    
255          foreach my $port ( keys %{ $self->{connected} } ) {          my $total;
256                  warn "get_from $port\n";  
257            foreach my $port ( $self->connected ) {
258                  my $result = $self->get_from( $port );                  my $result = $self->get_from( $port );
259  warn dump $result;                  warn "# result ", dump $result if $self->{debug};
260                  if ( $result->{view} ) {                  if ( my $out = delete $result->{out} ) {
261                          $self->merge( $result->{view} );                          warn "[$port] result ", dump($result), $/ if $result;
262                            $total += $self->merge( $out );
263                  } else {                  } else {
264                          warn "no view from $port\n";                          warn "no out from $port in ",dump $result;
265                  }                  }
266          }          }
267    
268            warn sprintf "view %d in %.4fs\n", $total, time - $t;
269    
270          return $out;          return $out;
271  }  }
272    

Legend:
Removed from v.97  
changed lines
  Added in v.127

  ViewVC Help
Powered by ViewVC 1.1.26