1 |
package Sack::Lorry; |
2 |
|
3 |
use warnings; |
4 |
use strict; |
5 |
|
6 |
use IO::Socket::INET; |
7 |
use Data::Dump qw(dump); |
8 |
use Storable; |
9 |
use File::Slurp; |
10 |
use Net::Ping; |
11 |
use Time::HiRes qw(time sleep); |
12 |
use File::Slurp; |
13 |
|
14 |
use lib 'lib'; |
15 |
use base qw(Sack::Pid); |
16 |
use Sack; |
17 |
|
18 |
our $pids; |
19 |
|
20 |
$SIG{CHLD} = 'IGNORE'; |
21 |
|
22 |
sub new { |
23 |
my $class = shift; |
24 |
my $self = bless {@_}, $class; |
25 |
$self->{sock} = {}; |
26 |
|
27 |
my @cloud = -e $self->{cloud} ? read_file $self->{cloud} : ( 'localhost' ); |
28 |
@cloud = map { chomp $_; $_; } @cloud; |
29 |
my $cloud_size = scalar @cloud; |
30 |
|
31 |
warn __PACKAGE__, " $Sack::VERSION cloud of $cloud_size nodes ", join(' ', @cloud), "\n"; |
32 |
|
33 |
$self->{cloud} = [ @cloud ]; |
34 |
|
35 |
return $self; |
36 |
} |
37 |
|
38 |
sub cloud { @{ $_[0]->{cloud} } } |
39 |
sub cloud_size { $#{ $_[0]->{cloud} } + 1 } |
40 |
|
41 |
sub start_nodes { |
42 |
my $self = shift; |
43 |
|
44 |
my $port = 4000; |
45 |
|
46 |
foreach my $host ( $self->cloud ) { |
47 |
if ( $self->start_node_port( $host, $port ) ) { |
48 |
warn "started [$port] on $host\n"; |
49 |
$port++; |
50 |
} |
51 |
} |
52 |
} |
53 |
|
54 |
sub load_nodes { |
55 |
my ( $self, $from ) = @_; |
56 |
|
57 |
my @nodes = $self->connected; |
58 |
my $size = $#nodes + 1; |
59 |
|
60 |
my $offset = 0; |
61 |
my $limit = int( ( $from->size + $size ) / $size ); |
62 |
|
63 |
foreach my $port ( @nodes ) { |
64 |
|
65 |
my $data = $from->shard( $offset, $limit ); |
66 |
$self->send_to( $port, { data => $data } ) || die "can't send to $port: $!"; |
67 |
warn "shard for [$port] loaded\n"; |
68 |
|
69 |
$port++; |
70 |
$offset += $limit; |
71 |
} |
72 |
|
73 |
warn "load_nodes status ", dump( $self->get_from_all ),$/; |
74 |
} |
75 |
|
76 |
|
77 |
sub connected { |
78 |
sort keys %{ $_[0]->{sock} } |
79 |
} |
80 |
|
81 |
sub connect_to { |
82 |
my ( $self, $port, $retries ) = @_; |
83 |
|
84 |
$retries ||= 30; |
85 |
warn "# connect_to [$port] $retries times"; |
86 |
|
87 |
my $sock = $self->{sock}->{$port}; |
88 |
|
89 |
while ( ! $sock && $retries-- ) { |
90 |
|
91 |
$sock = IO::Socket::INET->new( |
92 |
PeerAddr => '127.0.0.1', |
93 |
PeerPort => $port, |
94 |
Proto => 'tcp', |
95 |
); |
96 |
|
97 |
if ( ! $sock ) { |
98 |
print STDERR "."; |
99 |
sleep 0.1; |
100 |
} elsif ( $sock->connected ) { |
101 |
$self->{sock}->{$port} = $sock; |
102 |
warn "# connected to $port\n"; |
103 |
|
104 |
$self->send_to( $port, { info => 1 } ); |
105 |
warn "info ", dump( $self->get_from( $port ) ), $/; |
106 |
|
107 |
} else { |
108 |
close $sock; |
109 |
} |
110 |
|
111 |
} |
112 |
|
113 |
if ( ! $retries ) { |
114 |
warn "SKIP $port: $!"; |
115 |
return; |
116 |
} else { |
117 |
return $port; |
118 |
} |
119 |
} |
120 |
|
121 |
|
122 |
sub start_node_port { |
123 |
my ( $self, $host, $port ) = @_; |
124 |
|
125 |
chomp $host; |
126 |
|
127 |
my $p = Net::Ping->new; |
128 |
|
129 |
if ( ! $p->ping( $host ) ) { |
130 |
warn "can't ping [$host]\n"; |
131 |
return; |
132 |
} |
133 |
|
134 |
if ( $self->connect_to( $port, 1 ) ) { |
135 |
warn "re-using existing $port"; |
136 |
} |
137 |
|
138 |
my $ssh_config = '-F etc/lib.ssh'; |
139 |
|
140 |
my $pid_path = "/tmp/sack.$port.pid"; |
141 |
kill 9, read_file $pid_path if -e $pid_path; |
142 |
|
143 |
if ( my $pid = fork ) { |
144 |
# parent |
145 |
|
146 |
$self->connect_to( $port ) || return; |
147 |
|
148 |
$pids->{ $port } = $pid; |
149 |
$self->{port_on_host}->{$port} = $host; |
150 |
|
151 |
warn "start_node_port $host [$port] pid $pid\n"; |
152 |
|
153 |
return $port; |
154 |
|
155 |
} elsif ( ! defined $pid ) { |
156 |
warn "can't fork $host $port"; |
157 |
return; |
158 |
} else { |
159 |
# child |
160 |
|
161 |
my $cmd = $host !~ m{^(localhost|127\.)}i ? qq| |
162 |
ssh -f $ssh_config |
163 |
-S /tmp/sock.$port.ssh |
164 |
-L $port:127.0.0.1:$port |
165 |
$host |
166 |
| : ''; |
167 |
|
168 |
$cmd .= qq| |
169 |
/srv/Sack/bin/node.pl $port |
170 |
|; |
171 |
|
172 |
$cmd =~ s{\s+}{ }sg; |
173 |
|
174 |
$self->port_pid( $port, $$ ); |
175 |
|
176 |
warn "# exec: $cmd\n"; |
177 |
exec $cmd; |
178 |
} |
179 |
} |
180 |
|
181 |
sub send_to { |
182 |
my ( $self, $port, $data ) = @_; |
183 |
# warn "send_to [$port]\n"; |
184 |
Storable::store_fd( $data => $self->{sock}->{$port} ); |
185 |
} |
186 |
|
187 |
sub get_from { |
188 |
my ( $self, $port ) = @_; |
189 |
# warn "get_from [$port]\n"; |
190 |
my $data; |
191 |
eval { |
192 |
$data = Storable::fd_retrieve( $self->{sock}->{$port} ); |
193 |
}; |
194 |
warn "ERROR $@" if $@; |
195 |
return $data; |
196 |
} |
197 |
|
198 |
sub send_to_all { |
199 |
my ( $self, $data ) = @_; |
200 |
$self->send_to( $_, $data ) foreach $self->connected; |
201 |
} |
202 |
|
203 |
sub get_from_all { |
204 |
my ( $self, $callback ) = @_; |
205 |
my $result; |
206 |
foreach my $port ( $self->connected ) { |
207 |
my $data = $result->{$port} = $self->get_from($port); |
208 |
$callback->( $port, $data ) if $callback; |
209 |
} |
210 |
return $result; |
211 |
} |
212 |
|
213 |
sub restart_nodes { |
214 |
my ( $self ) = @_; |
215 |
foreach my $port ( $self->connected ) { |
216 |
warn "restart [$port]\n"; |
217 |
# $self->send_to( $port, { restart => 1 } ); |
218 |
# $self->connect_to( $port ); |
219 |
$self->send_to( $port, { exit => 1 } ); |
220 |
kill 9, $pids->{$port}; |
221 |
$self->start_node_port( $self->{port_on_host}->{$port}, $port ); |
222 |
} |
223 |
} |
224 |
|
225 |
|
226 |
our $out; |
227 |
|
228 |
use Digest::MD5 qw(md5); |
229 |
our $nr = 0; |
230 |
our $md5_nr; |
231 |
our $digest_fh; |
232 |
our @digest_offset; |
233 |
|
234 |
sub merge { |
235 |
my ( $self, $new ) = @_; |
236 |
|
237 |
my $t_merge = time(); |
238 |
|
239 |
my $tick = 0; |
240 |
|
241 |
my $missing; |
242 |
|
243 |
foreach my $k1 ( keys %$new ) { |
244 |
|
245 |
foreach my $k2 ( keys %{ $new->{$k1} } ) { |
246 |
|
247 |
my $n = delete $new->{$k1}->{$k2}; |
248 |
|
249 |
if ( $k1 =~ m{#} ) { |
250 |
my $md5 = md5 $k2; |
251 |
if ( defined $md5_nr->{$md5} ) { |
252 |
$k2 = $md5_nr->{$md5}; |
253 |
} else { |
254 |
open( $digest_fh, '>', '/tmp/sack.digest' ) unless $digest_fh; |
255 |
$digest_offset[ $nr ] = tell( $digest_fh ); |
256 |
print $digest_fh "$k2\n"; |
257 |
|
258 |
$k2 = $md5_nr->{$md5} = $nr; |
259 |
$nr++; |
260 |
} |
261 |
} |
262 |
|
263 |
my $ref = ref $out->{$k1}->{$k2}; |
264 |
|
265 |
if ( ! defined $out->{$k1}->{$k2} ) { |
266 |
$out->{$k1}->{$k2} = $n; |
267 |
} elsif ( $k1 =~ m{\+} ) { |
268 |
# warn "## agregate $k1 $k2"; |
269 |
$out->{$k1}->{$k2} += $n; |
270 |
} elsif ( $ref eq 'ARRAY' ) { |
271 |
if ( ref $n eq 'ARRAY' ) { |
272 |
push @{ $out->{$k1}->{$k2} }, $_ foreach @$n; |
273 |
} else { |
274 |
push @{ $out->{$k1}->{$k2} }, $n; |
275 |
} |
276 |
} elsif ( $ref eq '' ) { |
277 |
$out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ]; |
278 |
} else { |
279 |
die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2}); |
280 |
} |
281 |
|
282 |
if ( $tick++ % 1000 == 0 ) { |
283 |
print STDERR "."; |
284 |
} elsif ( $tick % 10000 == 0 ) { |
285 |
print STDERR $tick; |
286 |
} |
287 |
} |
288 |
} |
289 |
|
290 |
$t_merge = time - $t_merge; |
291 |
warn sprintf "\nmerged %d in %.4fs\n", $tick, $t_merge; |
292 |
|
293 |
return $tick; |
294 |
} |
295 |
|
296 |
|
297 |
sub view { |
298 |
my ( $self, $view ) = @_; |
299 |
|
300 |
my $t = time; |
301 |
$out = {}; |
302 |
|
303 |
warn "run view $view ", -s $view, " bytes\n"; |
304 |
|
305 |
my $view_code = read_file($view); |
306 |
$self->send_to_all({ view => $view_code }); |
307 |
|
308 |
my $total; |
309 |
|
310 |
foreach my $port ( $self->connected ) { |
311 |
my $result = $self->get_from( $port ); |
312 |
warn "# result ", dump $result if $self->{debug}; |
313 |
if ( my $out = delete $result->{out} ) { |
314 |
warn "[$port] result ", dump($result), $/ if $result; |
315 |
$total += $self->merge( $out ); |
316 |
} else { |
317 |
warn "no out from $port in ",dump $result; |
318 |
} |
319 |
} |
320 |
|
321 |
warn sprintf "view %d in %.4fs\n", $total, time - $t; |
322 |
|
323 |
return $out; |
324 |
} |
325 |
|
326 |
sub update_node { |
327 |
my $self = shift; |
328 |
my $updated; |
329 |
foreach my $port ( @_ ) { |
330 |
my $host = $self->{port_on_host}->{$port} || die "no port $port in ",dump $self; |
331 |
next if $host =~ m{(localhost|127\.)}; |
332 |
next if $updated->{$host}++; |
333 |
warn "update [$port] on $host to $Sack::VERSION\n"; |
334 |
system("find /srv/Sack/ | cpio --create | ssh -F etc/lib.ssh $host cpio --extract --make-directories --unconditional") == 0 and $self->restart_nodes( $port ); |
335 |
} |
336 |
} |
337 |
|
338 |
sub command { |
339 |
my ( $self, $cmd ) = @_; |
340 |
|
341 |
duration "repl $cmd"; |
342 |
|
343 |
my $repl = 1; |
344 |
|
345 |
# list verbose commands first |
346 |
if ( $cmd =~ m{^debug} ) { |
347 |
my $debug = $self->{debug} ? 0 : 1; |
348 |
warn "debug $debug\n"; |
349 |
$self->send_to_all({ debug => $debug }); |
350 |
$self->get_from_all( sub { |
351 |
my ( $port, $data ) = @_; |
352 |
warn "[$port] debug $data->{debug}\n"; |
353 |
} ); |
354 |
} elsif ( $cmd =~ m{^v} ) { |
355 |
$out = $self->view( $self->{view} ); |
356 |
duration 'view'; |
357 |
} elsif ( $cmd =~ m{^d} ) { |
358 |
warn dump $out; |
359 |
duration 'dump'; |
360 |
} elsif ( $cmd =~ m{^x} ) { |
361 |
$repl = 0; |
362 |
} elsif ( $cmd =~ m{^r} ) { |
363 |
$self->restart_nodes; |
364 |
} elsif ( $cmd =~ m{^i} ) { |
365 |
$self->send_to_all({ info => 1 }); |
366 |
my $info = $self->get_from_all; |
367 |
warn "INFO view $self->{view} ", -s $self->{view}, " bytes\n"; |
368 |
foreach my $port ( $self->connected ) { |
369 |
warn "[$port] $self->{port_on_host}->{$port} $pids->{$port} ", dump( $info->{$port} ), "\n"; |
370 |
if ( my $version = $info->{version} ) { |
371 |
warn "# $version $Sack::VERSION\n"; |
372 |
$self->update_node( $port ) if $version ne $Sack::VERSION; |
373 |
} |
374 |
} |
375 |
} elsif ( $cmd =~ m{^u} ) { |
376 |
$self->update_node( $self->connected ); |
377 |
} elsif ( $cmd =~ m{^sh\s+(.+)} ) { |
378 |
$self->send_to_all({ sh => $1 }); |
379 |
$self->get_from_all( sub { |
380 |
my ( $port, $data ) = @_; |
381 |
warn "[$port]# $1\n$data->{sh}"; |
382 |
} ); |
383 |
} else { |
384 |
warn "UNKNOWN $cmd\n" if $cmd; |
385 |
} |
386 |
|
387 |
return $repl; |
388 |
} |
389 |
|
390 |
|
391 |
sub DESTROY { |
392 |
warn "pids ",dump( $pids ); |
393 |
foreach ( values %$pids ) { |
394 |
warn "kill $_"; |
395 |
kill 1,$_ || kill 9, $_; |
396 |
} |
397 |
} |
398 |
|
399 |
1; |