1 |
dpavlin |
92 |
package Sack::Lorry; |
2 |
|
|
|
3 |
|
|
use warnings; |
4 |
|
|
use strict; |
5 |
|
|
|
6 |
|
|
use IO::Socket::INET; |
7 |
|
|
use Data::Dump qw(dump); |
8 |
|
|
use Storable; |
9 |
dpavlin |
97 |
use File::Slurp; |
10 |
dpavlin |
107 |
use Net::Ping; |
11 |
dpavlin |
92 |
|
12 |
|
|
our $pids; |
13 |
|
|
our $ports; |
14 |
|
|
|
15 |
|
|
$SIG{CHLD} = 'IGNORE'; |
16 |
|
|
|
17 |
|
|
my $port = 4000; |
18 |
|
|
|
19 |
|
|
sub new { |
20 |
|
|
my $class = shift; |
21 |
|
|
my $self = bless {@_}, $class; |
22 |
|
|
return $self; |
23 |
|
|
} |
24 |
|
|
|
25 |
|
|
sub start_node { |
26 |
|
|
my ( $self, $host ) = @_; |
27 |
|
|
|
28 |
dpavlin |
107 |
chomp $host; |
29 |
dpavlin |
96 |
|
30 |
dpavlin |
107 |
my $p = Net::Ping->new; |
31 |
|
|
|
32 |
|
|
if ( ! $p->ping( $host ) ) { |
33 |
|
|
warn "can't ping [$host]\n"; |
34 |
|
|
return; |
35 |
|
|
} |
36 |
|
|
|
37 |
|
|
warn "start_node $host\n"; |
38 |
|
|
|
39 |
|
|
my $ssh_config = '-F etc/lib.ssh'; |
40 |
|
|
|
41 |
dpavlin |
92 |
if ( my $pid = fork ) { |
42 |
|
|
# parent |
43 |
dpavlin |
97 |
$pids->{ "$host:$port" } = $pid; |
44 |
dpavlin |
92 |
$ports->{ $port } = $host; |
45 |
|
|
|
46 |
|
|
my $sock; |
47 |
|
|
|
48 |
|
|
print STDERR "waiting for $port"; |
49 |
|
|
|
50 |
|
|
while ( ! $sock ) { |
51 |
|
|
|
52 |
|
|
$sock = IO::Socket::INET->new( |
53 |
|
|
PeerAddr => '127.0.0.1', |
54 |
|
|
PeerPort => $port, |
55 |
|
|
Proto => 'tcp', |
56 |
|
|
); |
57 |
|
|
|
58 |
|
|
if ( ! $sock ) { |
59 |
|
|
print STDERR "."; |
60 |
|
|
sleep 1; |
61 |
|
|
} |
62 |
|
|
|
63 |
|
|
} |
64 |
|
|
|
65 |
|
|
$self->{sock}->{$port} = $sock; |
66 |
|
|
|
67 |
|
|
warn "\nconnected to $port\n"; |
68 |
|
|
|
69 |
dpavlin |
93 |
$self->{connected}->{$port} = $host; |
70 |
|
|
|
71 |
dpavlin |
92 |
return $port++; |
72 |
|
|
|
73 |
|
|
} elsif ( ! defined $pid ) { |
74 |
|
|
warn "can't fork $host $port"; |
75 |
|
|
return; |
76 |
|
|
} else { |
77 |
|
|
# child |
78 |
dpavlin |
107 |
|
79 |
dpavlin |
92 |
my $cmd = $host !~ m{^(localhost|127\.)}i ? qq| |
80 |
dpavlin |
107 |
ssh -f $ssh_config |
81 |
dpavlin |
92 |
-S /tmp/sock.$port.ssh |
82 |
|
|
-L $port:127.0.0.1:$port |
83 |
|
|
$host |
84 |
|
|
| : ''; |
85 |
|
|
|
86 |
|
|
$cmd .= qq| |
87 |
dpavlin |
96 |
/srv/Sack/bin/node.pl $port |
88 |
dpavlin |
92 |
|; |
89 |
|
|
|
90 |
|
|
$cmd =~ s{\s+}{ }sg; |
91 |
|
|
|
92 |
|
|
warn "exec: $cmd\n"; |
93 |
|
|
exec $cmd; |
94 |
|
|
} |
95 |
|
|
} |
96 |
|
|
|
97 |
|
|
sub send_to { |
98 |
|
|
my ( $self, $port, $data ) = @_; |
99 |
dpavlin |
93 |
warn "send_to [$port]\n"; |
100 |
dpavlin |
92 |
Storable::store_fd( $data => $self->{sock}->{$port} ); |
101 |
|
|
} |
102 |
|
|
|
103 |
|
|
sub get_from { |
104 |
|
|
my ( $self, $port ) = @_; |
105 |
dpavlin |
93 |
warn "get_from [$port]\n"; |
106 |
dpavlin |
92 |
Storable::fd_retrieve( $self->{sock}->{$port} ); |
107 |
|
|
} |
108 |
|
|
|
109 |
dpavlin |
93 |
sub send_to_all { |
110 |
|
|
my ( $self, $data ) = @_; |
111 |
|
|
$self->send_to( $_, $data ) foreach keys %{ $self->{connected} }; |
112 |
|
|
} |
113 |
|
|
|
114 |
|
|
sub get_from_all { |
115 |
|
|
my ( $self ) = @_; |
116 |
|
|
my $result; |
117 |
|
|
$result->{$_} = $self->get_from( $_ ) foreach keys %{ $self->{connected} }; |
118 |
|
|
return $result; |
119 |
|
|
} |
120 |
|
|
|
121 |
dpavlin |
97 |
|
122 |
|
|
our $out; |
123 |
|
|
|
124 |
dpavlin |
111 |
use Digest::MD5 qw(md5); |
125 |
|
|
our $nr = 0; |
126 |
|
|
our $md5_nr; |
127 |
|
|
our $digest_fh; |
128 |
|
|
our @digest_offset; |
129 |
|
|
|
130 |
dpavlin |
97 |
sub merge { |
131 |
|
|
my ( $self, $new ) = @_; |
132 |
|
|
|
133 |
|
|
my $t_merge = time(); |
134 |
|
|
|
135 |
|
|
my $tick = 0; |
136 |
|
|
|
137 |
|
|
my $missing; |
138 |
|
|
|
139 |
|
|
foreach my $k1 ( keys %$new ) { |
140 |
|
|
|
141 |
|
|
foreach my $k2 ( keys %{ $new->{$k1} } ) { |
142 |
|
|
|
143 |
|
|
my $n = delete $new->{$k1}->{$k2}; |
144 |
|
|
|
145 |
dpavlin |
111 |
if ( $k1 =~ m{#} ) { |
146 |
|
|
my $md5 = md5 $k2; |
147 |
|
|
if ( defined $md5_nr->{$md5} ) { |
148 |
|
|
$k2 = $md5_nr->{$md5}; |
149 |
|
|
} else { |
150 |
|
|
open( $digest_fh, '>', '/tmp/sack.digest' ) unless $digest_fh; |
151 |
|
|
$digest_offset[ $nr ] = tell( $digest_fh ); |
152 |
|
|
print $digest_fh "$k2\n"; |
153 |
|
|
|
154 |
|
|
$k2 = $md5_nr->{$md5} = $nr; |
155 |
|
|
$nr++; |
156 |
|
|
} |
157 |
|
|
} |
158 |
|
|
|
159 |
dpavlin |
97 |
my $ref = ref $out->{$k1}->{$k2}; |
160 |
|
|
|
161 |
|
|
if ( ! defined $out->{$k1}->{$k2} ) { |
162 |
|
|
$out->{$k1}->{$k2} = $n; |
163 |
|
|
} elsif ( $k1 =~ m{\+} ) { |
164 |
|
|
# warn "## agregate $k1 $k2"; |
165 |
|
|
$out->{$k1}->{$k2} += $n; |
166 |
|
|
} elsif ( $ref eq 'ARRAY' ) { |
167 |
|
|
if ( ref $n eq 'ARRAY' ) { |
168 |
|
|
push @{ $out->{$k1}->{$k2} }, $_ foreach @$n; |
169 |
|
|
} else { |
170 |
|
|
push @{ $out->{$k1}->{$k2} }, $n; |
171 |
|
|
} |
172 |
|
|
} elsif ( $ref eq '' ) { |
173 |
|
|
$out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ]; |
174 |
|
|
} else { |
175 |
|
|
die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2}); |
176 |
|
|
} |
177 |
|
|
|
178 |
|
|
if ( $tick++ % 1000 == 0 ) { |
179 |
|
|
print STDERR "."; |
180 |
|
|
} elsif ( $tick % 10000 == 0 ) { |
181 |
|
|
print STDERR $tick; |
182 |
|
|
} |
183 |
|
|
} |
184 |
|
|
} |
185 |
|
|
|
186 |
|
|
$t_merge = time - $t_merge; |
187 |
|
|
warn sprintf "\nmerged %d in %.4fs %.2f/s\n", $tick, $t_merge, $t_merge / $tick ; |
188 |
|
|
|
189 |
|
|
} |
190 |
|
|
|
191 |
|
|
|
192 |
|
|
sub view { |
193 |
|
|
my ( $self, $view ) = @_; |
194 |
|
|
|
195 |
dpavlin |
98 |
$out = {}; |
196 |
|
|
|
197 |
dpavlin |
97 |
warn "run view $view ", -s $view, " bytes\n"; |
198 |
|
|
|
199 |
|
|
my $view_code = read_file($view); |
200 |
|
|
$self->send_to_all({ view => $view_code }); |
201 |
|
|
|
202 |
|
|
foreach my $port ( keys %{ $self->{connected} } ) { |
203 |
|
|
my $result = $self->get_from( $port ); |
204 |
dpavlin |
98 |
warn "# result ", dump $result if $self->{debug}; |
205 |
dpavlin |
97 |
if ( $result->{view} ) { |
206 |
|
|
$self->merge( $result->{view} ); |
207 |
|
|
} else { |
208 |
|
|
warn "no view from $port\n"; |
209 |
|
|
} |
210 |
|
|
} |
211 |
|
|
|
212 |
|
|
return $out; |
213 |
|
|
} |
214 |
|
|
|
215 |
|
|
|
216 |
dpavlin |
92 |
sub DESTROY { |
217 |
|
|
warn "pids ",dump( $pids ); |
218 |
|
|
foreach ( values %$pids ) { |
219 |
|
|
warn "kill $_"; |
220 |
|
|
kill 1,$_ || kill 9, $_; |
221 |
|
|
} |
222 |
|
|
} |
223 |
|
|
|
224 |
|
|
1; |