/[Sack]/trunk/lib/Sack/Lorry.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /trunk/lib/Sack/Lorry.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 139 - (show annotations)
Wed Oct 7 18:58:17 2009 UTC (14 years, 8 months ago) by dpavlin
File size: 6371 byte(s)
fix command looping for repl
1 package Sack::Lorry;
2
3 use warnings;
4 use strict;
5
6 our $VERSION = '0.08';
7
8 use IO::Socket::INET;
9 use Data::Dump qw(dump);
10 use Storable;
11 use File::Slurp;
12 use Net::Ping;
13 use Time::HiRes qw(time sleep);
14
15 use lib 'lib';
16 use base qw(Sack::Pid);
17 use Sack;
18
19 our $pids;
20
21 $SIG{CHLD} = 'IGNORE';
22
23 sub new {
24 my $class = shift;
25 my $self = bless {@_}, $class;
26 $self->{sock} = {};
27 warn __PACKAGE__, " $VERSION\n";
28 return $self;
29 }
30
31 sub connected {
32 sort keys %{ $_[0]->{sock} }
33 }
34
35 sub connect_to {
36 my ( $self, $port, $retries ) = @_;
37
38 $retries ||= 30;
39 warn "# connect_to [$port] $retries times";
40
41 my $sock = $self->{sock}->{$port};
42
43 while ( ! $sock && $retries-- ) {
44
45 $sock = IO::Socket::INET->new(
46 PeerAddr => '127.0.0.1',
47 PeerPort => $port,
48 Proto => 'tcp',
49 );
50
51 if ( ! $sock ) {
52 print STDERR ".";
53 sleep 0.5;
54 } elsif ( $sock->connected ) {
55 $self->{sock}->{$port} = $sock;
56 warn "# connected to $port\n";
57
58 $self->send_to( $port, { info => 1 } );
59 warn "info ", dump( $self->get_from( $port ) ), $/;
60
61 } else {
62 close $sock;
63 }
64
65 }
66
67 if ( ! $retries ) {
68 warn "SKIP $port: $!";
69 return;
70 } else {
71 return $port;
72 }
73 }
74
75
76 sub start_node_port {
77 my ( $self, $host, $port ) = @_;
78
79 chomp $host;
80
81 my $p = Net::Ping->new;
82
83 if ( ! $p->ping( $host ) ) {
84 warn "can't ping [$host]\n";
85 return;
86 }
87
88 if ( $self->connect_to( $port, 1 ) ) {
89 warn "re-using existing $port";
90 }
91
92 my $ssh_config = '-F etc/lib.ssh';
93
94 my $pid_path = "/tmp/sack.$port.pid";
95 kill 9, read_file $pid_path if -e $pid_path;
96
97 if ( my $pid = fork ) {
98 # parent
99
100 $self->connect_to( $port ) || return;
101
102 $pids->{ $port } = $pid;
103 $self->{port_on_host}->{$port} = $host;
104
105 warn "start_node_port $host [$port] pid $pid\n";
106
107 return $port;
108
109 } elsif ( ! defined $pid ) {
110 warn "can't fork $host $port";
111 return;
112 } else {
113 # child
114
115 my $cmd = $host !~ m{^(localhost|127\.)}i ? qq|
116 ssh -f $ssh_config
117 -S /tmp/sock.$port.ssh
118 -L $port:127.0.0.1:$port
119 $host
120 | : '';
121
122 $cmd .= qq|
123 /srv/Sack/bin/node.pl $port
124 |;
125
126 $cmd =~ s{\s+}{ }sg;
127
128 $self->port_pid( $port, $$ );
129
130 warn "# exec: $cmd\n";
131 exec $cmd;
132 }
133 }
134
135 sub send_to {
136 my ( $self, $port, $data ) = @_;
137 # warn "send_to [$port]\n";
138 Storable::store_fd( $data => $self->{sock}->{$port} );
139 }
140
141 sub get_from {
142 my ( $self, $port ) = @_;
143 # warn "get_from [$port]\n";
144 my $data;
145 eval {
146 $data = Storable::fd_retrieve( $self->{sock}->{$port} );
147 };
148 warn "ERROR $@" if $@;
149 return $data;
150 }
151
152 sub send_to_all {
153 my ( $self, $data ) = @_;
154 $self->send_to( $_, $data ) foreach $self->connected;
155 }
156
157 sub get_from_all {
158 my ( $self ) = @_;
159 my $result;
160 $result->{$_} = $self->get_from( $_ ) foreach $self->connected;
161 return $result;
162 }
163
164 sub restart_nodes {
165 my ( $self ) = @_;
166 foreach my $port ( $self->connected ) {
167 warn "restart [$port]\n";
168 # $self->send_to( $port, { restart => 1 } );
169 # $self->connect_to( $port );
170 $self->send_to( $port, { exit => 1 } );
171 kill 9, $pids->{$port};
172 $self->start_node_port( $self->{port_on_host}->{$port}, $port );
173 }
174 }
175
176
177 our $out;
178
179 use Digest::MD5 qw(md5);
180 our $nr = 0;
181 our $md5_nr;
182 our $digest_fh;
183 our @digest_offset;
184
185 sub merge {
186 my ( $self, $new ) = @_;
187
188 my $t_merge = time();
189
190 my $tick = 0;
191
192 my $missing;
193
194 foreach my $k1 ( keys %$new ) {
195
196 foreach my $k2 ( keys %{ $new->{$k1} } ) {
197
198 my $n = delete $new->{$k1}->{$k2};
199
200 if ( $k1 =~ m{#} ) {
201 my $md5 = md5 $k2;
202 if ( defined $md5_nr->{$md5} ) {
203 $k2 = $md5_nr->{$md5};
204 } else {
205 open( $digest_fh, '>', '/tmp/sack.digest' ) unless $digest_fh;
206 $digest_offset[ $nr ] = tell( $digest_fh );
207 print $digest_fh "$k2\n";
208
209 $k2 = $md5_nr->{$md5} = $nr;
210 $nr++;
211 }
212 }
213
214 my $ref = ref $out->{$k1}->{$k2};
215
216 if ( ! defined $out->{$k1}->{$k2} ) {
217 $out->{$k1}->{$k2} = $n;
218 } elsif ( $k1 =~ m{\+} ) {
219 # warn "## agregate $k1 $k2";
220 $out->{$k1}->{$k2} += $n;
221 } elsif ( $ref eq 'ARRAY' ) {
222 if ( ref $n eq 'ARRAY' ) {
223 push @{ $out->{$k1}->{$k2} }, $_ foreach @$n;
224 } else {
225 push @{ $out->{$k1}->{$k2} }, $n;
226 }
227 } elsif ( $ref eq '' ) {
228 $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
229 } else {
230 die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
231 }
232
233 if ( $tick++ % 1000 == 0 ) {
234 print STDERR ".";
235 } elsif ( $tick % 10000 == 0 ) {
236 print STDERR $tick;
237 }
238 }
239 }
240
241 $t_merge = time - $t_merge;
242 warn sprintf "\nmerged %d in %.4fs\n", $tick, $t_merge;
243
244 return $tick;
245 }
246
247
248 sub view {
249 my ( $self, $view ) = @_;
250
251 my $t = time;
252 $out = {};
253
254 warn "run view $view ", -s $view, " bytes\n";
255
256 my $view_code = read_file($view);
257 $self->send_to_all({ view => $view_code });
258
259 my $total;
260
261 foreach my $port ( $self->connected ) {
262 my $result = $self->get_from( $port );
263 warn "# result ", dump $result if $self->{debug};
264 if ( my $out = delete $result->{out} ) {
265 warn "[$port] result ", dump($result), $/ if $result;
266 $total += $self->merge( $out );
267 } else {
268 warn "no out from $port in ",dump $result;
269 }
270 }
271
272 warn sprintf "view %d in %.4fs\n", $total, time - $t;
273
274 return $out;
275 }
276
277 sub command {
278 my ( $self, $cmd ) = @_;
279
280 duration "repl $cmd";
281
282 my $repl = 1;
283
284 if ( $cmd =~ m{^v} ) {
285 $out = $self->view( $self->{view} );
286 duration 'view';
287 } elsif ( $cmd =~ m{^d} ) {
288 warn dump $out;
289 duration 'dump';
290 } elsif ( $cmd =~ m{^x} ) {
291 $repl = 0;
292 } elsif ( $cmd =~ m{^r} ) {
293 $self->restart_nodes;
294 } elsif ( $cmd =~ m{^i} ) {
295 $self->send_to_all({ info => 1 });
296 my $info = $self->get_from_all;
297 foreach my $port ( $self->connected ) {
298 warn "INFO view $self->{view} ", -s $self->{view}, " bytes\n";
299 warn "[$port] $self->{port_on_host}->{$port} ", dump( $info->{$port} ), "\n";
300 }
301 } elsif ( $cmd =~ m{^u} ) {
302 my $updated;
303 foreach my $host ( $self->connected ) {
304 next if $updated->{$host}++;
305 warn "update $host\n";
306 system "find /srv/Sack/ | cpio --create | ssh -F etc/lib.ssh $host cpio --extract --make-directories --unconditional";
307 }
308 } elsif ( $cmd =~ m{^sh\s+(.+)} ) {
309 $self->send_to_all({ sh => $1 });
310 my $sh = $self->get_from_all;
311 foreach my $port ( $self->connected ) {
312 warn "[$port]# $1\n$sh->{$port}->{sh}";
313 }
314 } else {
315 warn "UNKNOWN $cmd\n" if $cmd;
316 }
317
318 return $repl;
319 }
320
321
322 sub DESTROY {
323 warn "pids ",dump( $pids );
324 foreach ( values %$pids ) {
325 warn "kill $_";
326 kill 1,$_ || kill 9, $_;
327 }
328 }
329
330 1;

  ViewVC Help
Powered by ViewVC 1.1.26