/[Sack]/trunk/lib/Sack/Lorry.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /trunk/lib/Sack/Lorry.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 127 - (show annotations)
Wed Oct 7 16:21:33 2009 UTC (14 years, 8 months ago) by dpavlin
File size: 5063 byte(s)
better recovery from node storable serialization errors,
faster cloud startup, 
added common pid handling,
added [u]pdate to lorry which push updated code to nodes,
tweaks to output,
version bump for Sack::Node [0.08]

1 package Sack::Lorry;
2
3 use warnings;
4 use strict;
5
6 our $VERSION = '0.08';
7
8 use IO::Socket::INET;
9 use Data::Dump qw(dump);
10 use Storable;
11 use File::Slurp;
12 use Net::Ping;
13 use Time::HiRes qw(time sleep);
14
15 use lib 'lib';
16 use base qw(Sack::Pid);
17
18 our $pids;
19
20 $SIG{CHLD} = 'IGNORE';
21
22 sub new {
23 my $class = shift;
24 my $self = bless {@_}, $class;
25 $self->{sock} = {};
26 warn __PACKAGE__, " $VERSION\n";
27 return $self;
28 }
29
30 sub connected {
31 sort keys %{ $_[0]->{sock} }
32 }
33
34 sub connect_to {
35 my ( $self, $port ) = @_;
36
37 my $sock;
38
39 warn "# waiting for [$port]";
40
41 my $retries = 30;
42
43 while ( ! $sock && $retries-- ) {
44
45 $sock = IO::Socket::INET->new(
46 PeerAddr => '127.0.0.1',
47 PeerPort => $port,
48 Proto => 'tcp',
49 );
50
51 if ( ! $sock ) {
52 print STDERR ".";
53 sleep 0.5;
54 } else {
55 undef $sock unless $sock->connected;
56 }
57
58 }
59
60 if ( ! $retries ) {
61 warn "SKIP $port: $!";
62 return;
63 }
64
65 $self->{sock}->{$port} = $sock;
66
67 $self->send_to( $port, { info => 1 } );
68 warn "info ", dump( $self->get_from( $port ) ), $/;
69
70 warn "# connected to $port\n";
71
72 return $port;
73 }
74
75
76 sub start_node_port {
77 my ( $self, $host, $port ) = @_;
78
79 chomp $host;
80
81 my $p = Net::Ping->new;
82
83 if ( ! $p->ping( $host ) ) {
84 warn "can't ping [$host]\n";
85 return;
86 }
87
88 my $ssh_config = '-F etc/lib.ssh';
89
90 my $pid_path = "/tmp/sack.$port.pid";
91 kill 9, read_file $pid_path if -e $pid_path;
92
93 if ( my $pid = fork ) {
94 # parent
95
96 $self->connect_to( $port ) || return;
97
98 $pids->{ $port } = $pid;
99 $self->{port_on_host}->{$port} = $host;
100
101 warn "start_node_port $host [$port] pid $pid\n";
102
103 return $port++;
104
105 } elsif ( ! defined $pid ) {
106 warn "can't fork $host $port";
107 return;
108 } else {
109 # child
110
111 my $cmd = $host !~ m{^(localhost|127\.)}i ? qq|
112 ssh -f $ssh_config
113 -S /tmp/sock.$port.ssh
114 -L $port:127.0.0.1:$port
115 $host
116 | : '';
117
118 $cmd .= qq|
119 /srv/Sack/bin/node.pl $port
120 |;
121
122 $cmd =~ s{\s+}{ }sg;
123
124 $self->port_pid( $port, $$ );
125
126 warn "# exec: $cmd\n";
127 exec $cmd;
128 }
129 }
130
131 sub send_to {
132 my ( $self, $port, $data ) = @_;
133 # warn "send_to [$port]\n";
134 Storable::store_fd( $data => $self->{sock}->{$port} );
135 }
136
137 sub get_from {
138 my ( $self, $port ) = @_;
139 # warn "get_from [$port]\n";
140 my $data;
141 eval {
142 $data = Storable::fd_retrieve( $self->{sock}->{$port} );
143 };
144 warn "ERROR $@" if $@;
145 return $data;
146 }
147
148 sub send_to_all {
149 my ( $self, $data ) = @_;
150 $self->send_to( $_, $data ) foreach $self->connected;
151 }
152
153 sub get_from_all {
154 my ( $self ) = @_;
155 my $result;
156 $result->{$_} = $self->get_from( $_ ) foreach $self->connected;
157 return $result;
158 }
159
160 sub restart_nodes {
161 my ( $self ) = @_;
162 foreach my $port ( $self->connected ) {
163 warn "restart [$port]\n";
164 # $self->send_to( $port, { restart => 1 } );
165 # $self->connect_to( $port );
166 $self->send_to( $port, { exit => 1 } );
167 kill 9, $pids->{$port};
168 $self->start_node_port( $self->{port_on_host}->{$port}, $port );
169 }
170 }
171
172
173 our $out;
174
175 use Digest::MD5 qw(md5);
176 our $nr = 0;
177 our $md5_nr;
178 our $digest_fh;
179 our @digest_offset;
180
181 sub merge {
182 my ( $self, $new ) = @_;
183
184 my $t_merge = time();
185
186 my $tick = 0;
187
188 my $missing;
189
190 foreach my $k1 ( keys %$new ) {
191
192 foreach my $k2 ( keys %{ $new->{$k1} } ) {
193
194 my $n = delete $new->{$k1}->{$k2};
195
196 if ( $k1 =~ m{#} ) {
197 my $md5 = md5 $k2;
198 if ( defined $md5_nr->{$md5} ) {
199 $k2 = $md5_nr->{$md5};
200 } else {
201 open( $digest_fh, '>', '/tmp/sack.digest' ) unless $digest_fh;
202 $digest_offset[ $nr ] = tell( $digest_fh );
203 print $digest_fh "$k2\n";
204
205 $k2 = $md5_nr->{$md5} = $nr;
206 $nr++;
207 }
208 }
209
210 my $ref = ref $out->{$k1}->{$k2};
211
212 if ( ! defined $out->{$k1}->{$k2} ) {
213 $out->{$k1}->{$k2} = $n;
214 } elsif ( $k1 =~ m{\+} ) {
215 # warn "## agregate $k1 $k2";
216 $out->{$k1}->{$k2} += $n;
217 } elsif ( $ref eq 'ARRAY' ) {
218 if ( ref $n eq 'ARRAY' ) {
219 push @{ $out->{$k1}->{$k2} }, $_ foreach @$n;
220 } else {
221 push @{ $out->{$k1}->{$k2} }, $n;
222 }
223 } elsif ( $ref eq '' ) {
224 $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
225 } else {
226 die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
227 }
228
229 if ( $tick++ % 1000 == 0 ) {
230 print STDERR ".";
231 } elsif ( $tick % 10000 == 0 ) {
232 print STDERR $tick;
233 }
234 }
235 }
236
237 $t_merge = time - $t_merge;
238 warn sprintf "\nmerged %d in %.4fs\n", $tick, $t_merge;
239
240 return $tick;
241 }
242
243
244 sub view {
245 my ( $self, $view ) = @_;
246
247 my $t = time;
248 $out = {};
249
250 warn "run view $view ", -s $view, " bytes\n";
251
252 my $view_code = read_file($view);
253 $self->send_to_all({ view => $view_code });
254
255 my $total;
256
257 foreach my $port ( $self->connected ) {
258 my $result = $self->get_from( $port );
259 warn "# result ", dump $result if $self->{debug};
260 if ( my $out = delete $result->{out} ) {
261 warn "[$port] result ", dump($result), $/ if $result;
262 $total += $self->merge( $out );
263 } else {
264 warn "no out from $port in ",dump $result;
265 }
266 }
267
268 warn sprintf "view %d in %.4fs\n", $total, time - $t;
269
270 return $out;
271 }
272
273
274 sub DESTROY {
275 warn "pids ",dump( $pids );
276 foreach ( values %$pids ) {
277 warn "kill $_";
278 kill 1,$_ || kill 9, $_;
279 }
280 }
281
282 1;

  ViewVC Help
Powered by ViewVC 1.1.26