3 |
use warnings; |
use warnings; |
4 |
use strict; |
use strict; |
5 |
|
|
6 |
our $VERSION = '0.05'; |
our $VERSION = '0.06'; |
7 |
|
|
8 |
use Time::HiRes qw(time); |
use Time::HiRes qw(time); |
9 |
use Data::Dump qw(dump); |
use Data::Dump qw(dump); |
53 |
#clean if $clean; # FIXME |
#clean if $clean; # FIXME |
54 |
die "SIG$signame"; |
die "SIG$signame"; |
55 |
}; |
}; |
56 |
|
|
57 |
|
sub port2color { |
58 |
|
my $port = shift; |
59 |
|
return "\e[1m0\e[0m" if $port == 0; |
60 |
|
|
61 |
|
my $c = ( $port % 6 ) + 31; |
62 |
|
return "\e[${c}m$port\e[0m"; |
63 |
|
} |
64 |
|
|
65 |
|
$SIG{__WARN__} = sub { |
66 |
|
return unless @_; |
67 |
|
my $msg = join('', @_); |
68 |
|
if ( $msg !~ m{[\n\r]$} ) { |
69 |
|
my @loc = caller(1); |
70 |
|
$msg .= " in $loc[1] +$loc[2]\n" if @loc; |
71 |
|
} |
72 |
|
$msg =~ s{\[(0|\d\d\d\d)\]}{ '[' . port2color($1) . ']' }eg; |
73 |
|
print STDERR $msg; |
74 |
|
return 1; |
75 |
|
}; |
76 |
|
|
77 |
} |
} |
78 |
|
|
79 |
|
our $digest_module = 'Sack::Digest::' . ( $ENV{SACK_DIGEST} || 'BerkeleyDB' ); |
80 |
use lib "$prefix/srv/Sack/lib/"; |
use lib "$prefix/srv/Sack/lib/"; |
81 |
use Sack::Digest; |
my $digest_path = $digest_module; |
82 |
our $digest = Sack::Digest->new( port => $port, clean => 1 ); |
$digest_path =~ s{::}{/}g; |
83 |
|
require "lib/$digest_path.pm"; |
84 |
|
|
85 |
|
our $digest = $digest_module->new( port => $port, clean => 1 ); |
86 |
sub digest { $digest->to_int($_[0]) } |
sub digest { $digest->to_int($_[0]) } |
87 |
|
|
88 |
|
warn "[$port] using $digest_module\n"; |
89 |
|
|
90 |
use lib "$prefix/srv/webpac2/lib/"; |
use lib "$prefix/srv/webpac2/lib/"; |
91 |
use WebPAC::Input::ISI; |
use WebPAC::Input::ISI; |
92 |
|
|
105 |
my $description = join(' ',@_); |
my $description = join(' ',@_); |
106 |
my $dt = time - $t; |
my $dt = time - $t; |
107 |
my $report = [ $description, $dt, $input->size / $dt ]; |
my $report = [ $description, $dt, $input->size / $dt ]; |
108 |
printf "[$port] %s in %1.4fs %.2f/s\n", @$report; |
warn sprintf "[$port] %s in %1.4fs %.2f/s\n", @$report; |
109 |
push @reports, $report; |
push @reports, $report; |
110 |
$t = time; |
$t = time; |
111 |
} |
} |
191 |
|
|
192 |
my $size = <$sock>; |
my $size = <$sock>; |
193 |
chomp($size); |
chomp($size); |
194 |
warn "[$port] pull_node_file $node $file $size bytes\n"; |
warn "[$port] pull_node_file [$node] $file $size bytes\n"; |
195 |
|
|
196 |
my $block = 4096; |
my $block = 4096; |
197 |
my $buff = ' ' x $block; |
my $buff = ' ' x $block; |
203 |
close($fh); |
close($fh); |
204 |
} |
} |
205 |
|
|
206 |
|
my $merge_digest_mapping; |
207 |
|
|
208 |
sub merge_out { |
sub merge_out { |
209 |
my ( $from_node, $new ) = @_; |
my ( $from_node, $new ) = @_; |
210 |
|
|
217 |
my ( $local, $remote ) = ( 0, 0 ); |
my ( $local, $remote ) = ( 0, 0 ); |
218 |
|
|
219 |
my $tick = 0; |
my $tick = 0; |
220 |
print STDERR "[$port] merge [$from_node]"; |
warn "[$port] merge [$from_node]"; |
221 |
|
|
222 |
my $missing; |
my $missing; |
223 |
|
|
230 |
if ( $k1 =~ m{#} ) { |
if ( $k1 =~ m{#} ) { |
231 |
die "ASSERT $k1 $k2" unless $k2 =~ m{^\d+$}; |
die "ASSERT $k1 $k2" unless $k2 =~ m{^\d+$}; |
232 |
#warn "XXX $k1 $k2"; |
#warn "XXX $k1 $k2"; |
|
my $md5 = $remote_digest->{nr_md5}->[$k2]; |
|
233 |
|
|
234 |
if ( ! $md5 ) { |
if ( defined $merge_digest_mapping->{$from_node}->[ $k2 ] ) { |
235 |
$missing->{nr_md5}->{$from_node}++; # FIXME die? |
$k2 = $merge_digest_mapping->{$from_node}->[ $k2 ]; |
|
next; |
|
|
} |
|
|
|
|
|
if ( my $local_k2 = $digest->{md5_nr}->{$md5} ) { |
|
|
$k2 = $local_k2; |
|
|
$local++; |
|
|
} elsif ( my $full = $remote_digest->{md5}->{$md5} ) { |
|
|
$k2 = $digest->to_int( $remote_digest->{md5}->{$md5} ); |
|
|
$remote++; |
|
236 |
} else { |
} else { |
237 |
$missing->{md5}->{$from_node}++; |
|
238 |
|
my $md5 = $remote_digest->{nr_md5}->[$k2]; |
239 |
|
|
240 |
|
if ( ! $md5 ) { |
241 |
|
$missing->{nr_md5}->{$from_node}++; # FIXME die? |
242 |
|
next; |
243 |
|
} |
244 |
|
|
245 |
|
my $local_k2; |
246 |
|
|
247 |
|
if ( $local_k2 = $digest->{md5_nr}->{$md5} ) { |
248 |
|
$local++; |
249 |
|
} elsif ( my $full = $remote_digest->{md5}->{$md5} ) { |
250 |
|
$local_k2 = $digest->to_int( $remote_digest->{md5}->{$md5} ); |
251 |
|
$remote++; |
252 |
|
} else { |
253 |
|
$missing->{md5}->{$from_node}++; |
254 |
|
} |
255 |
|
|
256 |
|
$k2 = $merge_digest_mapping->{$from_node}->[ $k2 ] = $local_k2; |
257 |
|
|
258 |
} |
} |
259 |
} |
} |
260 |
|
|
286 |
} |
} |
287 |
|
|
288 |
$t_merge = time - $t_merge; |
$t_merge = time - $t_merge; |
289 |
printf STDERR "%d in %.4fs %.2f/s local %.1f%% %d/%d\n", $tick, $t_merge, $tick / $t_merge, $local * 100 / $tick, $local, $remote; |
my $digests = $local + $remote; |
290 |
push @reports, [ "$tick merged $from_node", $t_merge, $tick / $t_merge ]; |
warn sprintf "\n[$port] merge %d in %.4fs %.2f/s digests local: %.1f%% %d/%d\n", $tick, $t_merge, $digests / $t_merge, $local * 100 / ( $digests || 1 ), $local, $remote; |
291 |
|
push @reports, [ "$tick merged [$from_node]", $t_merge, $digests / $t_merge ]; |
292 |
|
|
293 |
warn "[$port] missing ", dump $missing if $missing; |
warn "[$port] missing ", dump $missing if $missing; |
294 |
|
|
295 |
warn "## merge out ", dump $out if $debug; |
warn "## merge out ", dump $out if $debug; |
296 |
} |
} |
297 |
|
|
298 |
|
our $rec; |
299 |
|
|
300 |
sub run_code { |
sub run_code { |
301 |
my ( $view, $code ) = @_; |
my ( $view, $code ) = @_; |
302 |
|
|
309 |
my $affected = 0; |
my $affected = 0; |
310 |
$t = time; |
$t = time; |
311 |
|
|
312 |
|
my $coderef = eval "sub { $code }"; |
313 |
|
if ( $@ ) { |
314 |
|
warn "ABORT code: $@"; |
315 |
|
return; |
316 |
|
} |
317 |
|
|
318 |
foreach my $pos ( $offset + 1 .. $offset + $input->size ) { |
foreach my $pos ( $offset + 1 .. $offset + $input->size ) { |
319 |
my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos ); |
$rec = $cache->{$pos} ||= $input->fetch_rec( $pos ); |
320 |
if ( ! $rec ) { |
if ( ! $rec ) { |
321 |
print STDERR "END @ $pos"; |
print STDERR "END @ $pos"; |
322 |
last; |
last; |
323 |
} |
} |
324 |
|
|
325 |
eval "$code"; |
eval { $coderef->() }; |
326 |
if ( $@ ) { |
if ( $@ ) { |
327 |
warn "ABORT $pos $@\n"; |
warn "ABORT $pos $@\n"; |
328 |
last; |
last; |
388 |
|
|
389 |
|
|
390 |
sub info_tabs { |
sub info_tabs { |
391 |
"$port\t$offset\t$limit\t$num_records\t$path\t" |
my $port_col = port2color($port); |
392 |
|
"$port_col\t$offset\t$limit\t$num_records\t$path\t" |
393 |
. join("\t", map { |
. join("\t", map { |
394 |
my $b = $_; |
my $b = $_; |
395 |
$b =~ s{^.+\.$port\.([^/]+)$}{$1}; |
$b =~ s{^.+\.$port\.([^/]+)$}{$1}; |
403 |
my $pid_path = "/tmp/sack.$port.pid"; |
my $pid_path = "/tmp/sack.$port.pid"; |
404 |
if ( -e $pid_path ) { |
if ( -e $pid_path ) { |
405 |
my $pid = read_file $pid_path; |
my $pid = read_file $pid_path; |
406 |
kill 9, $pid && warn "[$port] kill old $pid"; |
kill 9, $pid && warn "[$port] kill old $pid\n"; |
407 |
} |
} |
408 |
write_file $pid_path, $$; |
write_file $pid_path, $$; |
409 |
|
|
414 |
LocalPort => $port, |
LocalPort => $port, |
415 |
Proto => 'tcp', |
Proto => 'tcp', |
416 |
Reuse => 1, |
Reuse => 1, |
417 |
) or die $!; |
) or die "[$port] die $!"; |
418 |
|
|
419 |
while (1) { |
while (1) { |
420 |
|
|
489 |
|
|
490 |
info; |
info; |
491 |
while ( keys %$connected != @nodes ) { |
while ( keys %$connected != @nodes ) { |
492 |
warn "[$port] wait for ", join(' ', grep { ! defined $connected->{$_} } @nodes ); |
warn "[$port] wait for [", join('] [', grep { ! defined $connected->{$_} } @nodes ), "]\n"; |
493 |
sleep 1; |
sleep 1; |
494 |
|
info; |
495 |
} |
} |
496 |
run_views; |
run_views; |
497 |
|
|
506 |
|
|
507 |
View Run run views |
View Run run views |
508 |
VI \\e Output show output of last run |
VI \\e Output show output of last run |
509 |
Info [\$VERSION] instrospect |
Info [Report] info with optional report |
510 |
Quit EXit shutdown |
Quit EXit shutdown |
511 |
|
|
512 |
__HELP__ |
__HELP__ |