/[Sack]/trunk/lib/Sack/Lorry.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /trunk/lib/Sack/Lorry.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 159 - (show annotations)
Fri Oct 23 13:08:45 2009 UTC (14 years, 7 months ago) by dpavlin
File size: 8135 byte(s)
fix output

1 package Sack::Lorry;
2
3 use warnings;
4 use strict;
5
6 use IO::Socket::INET;
7 use Data::Dump qw(dump);
8 use Storable;
9 use File::Slurp;
10 use Net::Ping;
11 use Time::HiRes qw(time sleep);
12 use File::Slurp;
13
14 use lib 'lib';
15 use base qw(Sack::Pid);
16 use Sack;
17
18 our $pids;
19
20 $SIG{CHLD} = 'IGNORE';
21
22 sub new {
23 my $class = shift;
24 my $self = bless {@_}, $class;
25 $self->{sock} = {};
26
27 my @cloud = -e $self->{cloud} ? read_file $self->{cloud} : ( 'localhost' );
28 @cloud = map { chomp $_; $_; } @cloud;
29 my $cloud_size = scalar @cloud;
30
31 warn __PACKAGE__, " $Sack::VERSION cloud of $cloud_size nodes ", join(' ', @cloud), "\n";
32
33 $self->{cloud} = [ @cloud ];
34
35 return $self;
36 }
37
38 sub cloud { @{ $_[0]->{cloud} } }
39 sub cloud_size { $#{ $_[0]->{cloud} } + 1 }
40
41 sub start_nodes {
42 my $self = shift;
43
44 my $port = 4000;
45
46 foreach my $host ( $self->cloud ) {
47 if ( $self->start_node_port( $host, $port ) ) {
48 warn "started [$port] on $host\n";
49 $port++;
50 }
51 }
52 }
53
54 sub load_nodes {
55 my ( $self, $from ) = @_;
56
57 my @nodes = $self->connected;
58 my $size = $#nodes + 1;
59
60 my $offset = 0;
61 my $limit = int( ( $from->size + $size ) / $size );
62
63 foreach my $port ( @nodes ) {
64
65 my $data = $from->shard( $offset, $limit );
66 $self->send_to( $port, { data => $data } ) || die "can't send to $port: $!";
67 warn "shard for [$port] loaded\n";
68
69 $port++;
70 $offset += $limit;
71 }
72
73 warn "load_nodes status ", dump( $self->get_from_all ),$/;
74 }
75
76
77 sub connected {
78 sort keys %{ $_[0]->{sock} }
79 }
80
81 sub connect_to {
82 my ( $self, $port, $retries ) = @_;
83
84 $retries ||= 30;
85 warn "# connect_to [$port] $retries times";
86
87 my $sock = $self->{sock}->{$port};
88
89 while ( ! $sock && $retries-- ) {
90
91 $sock = IO::Socket::INET->new(
92 PeerAddr => '127.0.0.1',
93 PeerPort => $port,
94 Proto => 'tcp',
95 );
96
97 if ( ! $sock ) {
98 print STDERR ".";
99 sleep 0.1;
100 } elsif ( $sock->connected ) {
101 $self->{sock}->{$port} = $sock;
102 warn "# connected to $port\n";
103
104 $self->send_to( $port, { info => 1 } );
105 warn "info ", dump( $self->get_from( $port ) ), $/;
106
107 } else {
108 close $sock;
109 }
110
111 }
112
113 if ( ! $retries ) {
114 warn "SKIP $port: $!";
115 return;
116 } else {
117 return $port;
118 }
119 }
120
121
122 sub start_node_port {
123 my ( $self, $host, $port ) = @_;
124
125 chomp $host;
126
127 my $p = Net::Ping->new;
128
129 if ( ! $p->ping( $host ) ) {
130 warn "can't ping [$host]\n";
131 return;
132 }
133
134 if ( $self->connect_to( $port, 1 ) ) {
135 warn "re-using existing $port";
136 }
137
138 my $ssh_config = '-F etc/lib.ssh';
139
140 my $pid_path = "/tmp/sack.$port.pid";
141 kill 9, read_file $pid_path if -e $pid_path;
142
143 if ( my $pid = fork ) {
144 # parent
145
146 $self->connect_to( $port ) || return;
147
148 $pids->{ $port } = $pid;
149 $self->{port_on_host}->{$port} = $host;
150
151 warn "start_node_port $host [$port] pid $pid\n";
152
153 return $port;
154
155 } elsif ( ! defined $pid ) {
156 warn "can't fork $host $port";
157 return;
158 } else {
159 # child
160
161 my $cmd = $host !~ m{^(localhost|127\.)}i ? qq|
162 ssh -f $ssh_config
163 -S /tmp/sock.$port.ssh
164 -L $port:127.0.0.1:$port
165 $host
166 | : '';
167
168 $cmd .= qq|
169 /srv/Sack/bin/node.pl $port
170 |;
171
172 $cmd =~ s{\s+}{ }sg;
173
174 $self->port_pid( $port, $$ );
175
176 warn "# exec: $cmd\n";
177 exec $cmd;
178 }
179 }
180
181 sub send_to {
182 my ( $self, $port, $data ) = @_;
183 # warn "send_to [$port]\n";
184 Storable::store_fd( $data => $self->{sock}->{$port} );
185 }
186
187 sub get_from {
188 my ( $self, $port ) = @_;
189 # warn "get_from [$port]\n";
190 my $data;
191 eval {
192 $data = Storable::fd_retrieve( $self->{sock}->{$port} );
193 };
194 warn "ERROR $@" if $@;
195 return $data;
196 }
197
198 sub send_to_all {
199 my ( $self, $data ) = @_;
200 $self->send_to( $_, $data ) foreach $self->connected;
201 }
202
203 sub get_from_all {
204 my ( $self, $callback ) = @_;
205 my $result;
206 foreach my $port ( $self->connected ) {
207 my $data = $result->{$port} = $self->get_from($port);
208 $callback->( $port, $data ) if $callback;
209 }
210 return $result;
211 }
212
213 sub restart_nodes {
214 my ( $self ) = @_;
215 foreach my $port ( $self->connected ) {
216 warn "restart [$port]\n";
217 # $self->send_to( $port, { restart => 1 } );
218 # $self->connect_to( $port );
219 $self->send_to( $port, { exit => 1 } );
220 kill 9, $pids->{$port};
221 $self->start_node_port( $self->{port_on_host}->{$port}, $port );
222 }
223 }
224
225
226 our $out;
227
228 use Digest::MD5 qw(md5);
229 our $nr = 0;
230 our $md5_nr;
231 our $digest_fh;
232 our @digest_offset;
233
234 sub merge {
235 my ( $self, $new ) = @_;
236
237 my $t_merge = time();
238
239 my $tick = 0;
240
241 my $missing;
242
243 foreach my $k1 ( keys %$new ) {
244
245 foreach my $k2 ( keys %{ $new->{$k1} } ) {
246
247 my $n = delete $new->{$k1}->{$k2};
248
249 if ( $k1 =~ m{#} ) {
250 my $md5 = md5 $k2;
251 if ( defined $md5_nr->{$md5} ) {
252 $k2 = $md5_nr->{$md5};
253 } else {
254 open( $digest_fh, '>', '/tmp/sack.digest' ) unless $digest_fh;
255 $digest_offset[ $nr ] = tell( $digest_fh );
256 print $digest_fh "$k2\n";
257
258 $k2 = $md5_nr->{$md5} = $nr;
259 $nr++;
260 }
261 }
262
263 my $ref = ref $out->{$k1}->{$k2};
264
265 if ( ! defined $out->{$k1}->{$k2} ) {
266 $out->{$k1}->{$k2} = $n;
267 } elsif ( $k1 =~ m{\+} ) {
268 # warn "## agregate $k1 $k2";
269 $out->{$k1}->{$k2} += $n;
270 } elsif ( $ref eq 'ARRAY' ) {
271 if ( ref $n eq 'ARRAY' ) {
272 push @{ $out->{$k1}->{$k2} }, $_ foreach @$n;
273 } else {
274 push @{ $out->{$k1}->{$k2} }, $n;
275 }
276 } elsif ( $ref eq '' ) {
277 $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
278 } else {
279 die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
280 }
281
282 if ( $tick++ % 1000 == 0 ) {
283 print STDERR ".";
284 } elsif ( $tick % 10000 == 0 ) {
285 print STDERR $tick;
286 }
287 }
288 }
289
290 $t_merge = time - $t_merge;
291 warn sprintf "\nmerged %d in %.4fs\n", $tick, $t_merge;
292
293 return $tick;
294 }
295
296
297 sub view {
298 my ( $self, $view ) = @_;
299
300 my $t = time;
301 $out = {};
302
303 warn "run view $view ", -s $view, " bytes\n";
304
305 my $view_code = read_file($view);
306 $self->send_to_all({ view => $view_code });
307
308 my $total;
309
310 foreach my $port ( $self->connected ) {
311 my $result = $self->get_from( $port );
312 warn "# result ", dump $result if $self->{debug};
313 if ( my $out = delete $result->{out} ) {
314 warn "[$port] result ", dump($result), $/ if $result;
315 $total += $self->merge( $out );
316 } else {
317 warn "no out from $port in ",dump $result;
318 }
319 }
320
321 warn sprintf "view %d in %.4fs\n", $total, time - $t;
322
323 return $out;
324 }
325
326 sub update_node {
327 my $self = shift;
328 my $updated;
329 foreach my $port ( @_ ) {
330 my $host = $self->{port_on_host}->{$port} || die "no port $port in ",dump $self;
331 next if $host =~ m{(localhost|127\.)};
332 next if $updated->{$host}++;
333 warn "update [$port] on $host to $Sack::VERSION\n";
334 system("find /srv/Sack/ | cpio --create | ssh -F etc/lib.ssh $host cpio --extract --make-directories --unconditional") == 0 and $self->restart_nodes( $port );
335 }
336 }
337
338 sub command {
339 my ( $self, $cmd ) = @_;
340
341 duration "repl $cmd";
342
343 my $repl = 1;
344
345 # list verbose commands first
346 if ( $cmd =~ m{^debug} ) {
347 my $debug = $self->{debug} ? 0 : 1;
348 warn "debug $debug\n";
349 $self->send_to_all({ debug => $debug });
350 $self->get_from_all( sub {
351 my ( $port, $data ) = @_;
352 warn "[$port] debug $data->{debug}\n";
353 } );
354 } elsif ( $cmd =~ m{^v} ) {
355 $out = $self->view( $self->{view} );
356 duration 'view';
357 } elsif ( $cmd =~ m{^d} ) {
358 warn dump $out;
359 duration 'dump';
360 } elsif ( $cmd =~ m{^x} ) {
361 $repl = 0;
362 } elsif ( $cmd =~ m{^r} ) {
363 $self->restart_nodes;
364 } elsif ( $cmd =~ m{^i} ) {
365 $self->send_to_all({ info => 1 });
366 my $info = $self->get_from_all;
367 warn "INFO view $self->{view} ", -s $self->{view}, " bytes\n";
368 foreach my $port ( $self->connected ) {
369 warn "[$port] $self->{port_on_host}->{$port} $pids->{$port} ", dump( $info->{$port} ), "\n";
370 if ( my $version = $info->{version} ) {
371 warn "# $version $Sack::VERSION\n";
372 $self->update_node( $port ) if $version ne $Sack::VERSION;
373 }
374 }
375 } elsif ( $cmd =~ m{^u} ) {
376 $self->update_node( $self->connected );
377 } elsif ( $cmd =~ m{^sh\s+(.+)} ) {
378 $self->send_to_all({ sh => $1 });
379 $self->get_from_all( sub {
380 my ( $port, $data ) = @_;
381 warn "[$port]# $1\n$data->{sh}";
382 } );
383 } else {
384 warn "UNKNOWN $cmd\n" if $cmd;
385 }
386
387 return $repl;
388 }
389
390
391 sub DESTROY {
392 warn "pids ",dump( $pids );
393 foreach ( values %$pids ) {
394 warn "kill $_";
395 kill 1,$_ || kill 9, $_;
396 }
397 }
398
399 1;

  ViewVC Help
Powered by ViewVC 1.1.26