3 |
use warnings; |
use warnings; |
4 |
use strict; |
use strict; |
5 |
|
|
6 |
our $VERSION = '0.04'; |
our $VERSION = '0.05'; |
7 |
|
|
8 |
use Time::HiRes qw(time); |
use Time::HiRes qw(time); |
9 |
use Data::Dump qw(dump); |
use Data::Dump qw(dump); |
72 |
); |
); |
73 |
|
|
74 |
our $num_records = $input->size; |
our $num_records = $input->size; |
75 |
|
our @reports; |
76 |
|
|
77 |
sub report { |
sub report { |
78 |
my $description = join(' ',@_); |
my $description = join(' ',@_); |
79 |
my $dt = time - $t; |
my $dt = time - $t; |
80 |
printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt; |
my $report = [ $description, $dt, $input->size / $dt ]; |
81 |
|
printf "[$port] %s in %1.4fs %.2f/s\n", @$report; |
82 |
|
push @reports, $report; |
83 |
$t = time; |
$t = time; |
84 |
} |
} |
85 |
|
|
86 |
|
sub show_report { |
87 |
|
"\n" . join( "\n", map { sprintf "%8.4fs %10.2f/s %s", $_->[1], $_->[2], $_->[0] } @reports ) . "\n"; |
88 |
|
} |
89 |
|
|
90 |
report $input->size . ' records loaded'; |
report $input->size , 'records loaded'; |
91 |
|
|
92 |
mkdir 'out' unless -e 'out'; |
mkdir 'out' unless -e 'out'; |
93 |
|
|
116 |
my $header = defined $content ? length($content) : 0; |
my $header = defined $content ? length($content) : 0; |
117 |
$header .= ' ' . join(' ', @_) if @_; |
$header .= ' ' . join(' ', @_) if @_; |
118 |
|
|
119 |
warn "# send_nodes ", dump @nodes; |
warn "# send_nodes ", dump(@_), " to ", dump @nodes; |
120 |
|
|
121 |
foreach my $node ( @nodes ) { |
foreach my $node ( @nodes ) { |
122 |
|
|
248 |
|
|
249 |
$t_merge = time - $t_merge; |
$t_merge = time - $t_merge; |
250 |
printf STDERR "%d in %.4fs %.2f/s local %.1f%% %d/%d\n", $tick, $t_merge, $tick / $t_merge, $local * 100 / $tick, $local, $remote; |
printf STDERR "%d in %.4fs %.2f/s local %.1f%% %d/%d\n", $tick, $t_merge, $tick / $t_merge, $local * 100 / $tick, $local, $remote; |
251 |
|
push @reports, [ "$tick merged $from_node", $t_merge, $tick / $t_merge ]; |
252 |
|
|
253 |
warn "[$port] missing ", dump $missing if $missing; |
warn "[$port] missing ", dump $missing if $missing; |
254 |
|
|
286 |
$pos % 1000 == 0 ? print STDERR "." : 0 ; |
$pos % 1000 == 0 ? print STDERR "." : 0 ; |
287 |
}; |
}; |
288 |
|
|
289 |
report "\n[$port] RECS $affected $view"; |
report "$affected affected $view"; |
290 |
|
|
291 |
warn "WARN no \$out defined!" unless defined $out; |
warn "WARN no \$out defined!" unless defined $out; |
292 |
|
|
294 |
|
|
295 |
if ( $connected ) { |
if ( $connected ) { |
296 |
foreach my $node ( keys %$connected ) { |
foreach my $node ( keys %$connected ) { |
297 |
warn "[$port] get_node $node\n"; |
warn "[$port] get_node [$node]\n"; |
298 |
my $o = get_node $node; |
my $o = get_node $node; |
299 |
next unless $o; |
next unless $o; |
300 |
my $s = length $o; |
my $s = length $o; |
301 |
$o = thaw $o; |
$o = thaw $o; |
302 |
warn "[$port] merge $node $s bytes\n"; |
warn "[$port] got $s bytes from [$node]\n"; |
303 |
merge_out $node => $o; |
merge_out $node => $o; |
304 |
} |
} |
305 |
} |
} |
327 |
rename $path, "$path.last"; |
rename $path, "$path.last"; |
328 |
|
|
329 |
store $out => $path; |
store $out => $path; |
330 |
report "[$port] SAVE $path $offset-$limit", -s $path, "bytes"; |
report "save $path", -s $path, "bytes"; |
331 |
|
|
332 |
if ( -s $path < 4096 ) { |
if ( -s $path < 4096 ) { |
333 |
print '$out = ', dump $digest->undigest_out($out); |
print '$out = ', dump $digest->undigest_out($out); |
360 |
|
|
361 |
while (1) { |
while (1) { |
362 |
|
|
363 |
warn "[$port] READY path: $path offset: $offset limit: $limit #recs: $num_records\n"; |
warn "[$port] accept path: $path offset: $offset limit: $limit #recs: $num_records\n"; |
364 |
|
|
365 |
my $client = $sock->accept(); |
my $client = $sock->accept(); |
366 |
|
|
380 |
} elsif ( $header[0] eq 'info' ) { |
} elsif ( $header[0] eq 'info' ) { |
381 |
my $info = info_tabs; |
my $info = info_tabs; |
382 |
warn "[$port] info $info\n"; |
warn "[$port] info $info\n"; |
383 |
|
$info .= "\n" . show_report if $content =~ m{r}i; |
384 |
send_sock $client => $info; |
send_sock $client => $info; |
385 |
} elsif ( $header[0] eq 'exit' ) { |
} elsif ( $header[0] eq 'exit' ) { |
386 |
warn "[$port] exit"; |
warn "[$port] exit"; |
406 |
} |
} |
407 |
|
|
408 |
sub info { |
sub info { |
409 |
send_nodes 'info' => $2; |
my $detail = shift || ''; |
410 |
|
|
411 |
|
send_nodes 'info' => $detail; |
412 |
|
|
413 |
my @info = ( |
my @info = ( |
414 |
"port\toffset\tlimit\t#recs\tpath", |
"port\toffset\tlimit\t#recs\tpath", |
416 |
info_tabs, |
info_tabs, |
417 |
); |
); |
418 |
|
|
419 |
|
push @info, show_report if $detail =~ m{r}i; |
420 |
|
|
421 |
push @info, get_node $_ foreach @nodes; |
push @info, get_node $_ foreach @nodes; |
422 |
|
|
423 |
print "[$port] INFO\n" |
print "[$port] INFO", $detail ? " $detail" : '', " \n" |
424 |
, join("\n", @info) |
, join("\n", @info) |
425 |
, "\n\n" ; |
, "\n" ; |
426 |
|
|
427 |
return @info; |
return @info; |
428 |
} |
} |
449 |
#system "vi out/*"; |
#system "vi out/*"; |
450 |
$digest->sync; |
$digest->sync; |
451 |
system "bin/storableedit.pl", (glob('out/*.storable'))[0]; |
system "bin/storableedit.pl", (glob('out/*.storable'))[0]; |
452 |
} elsif ( $cmd =~ m{^i(?:nfo)?\s?(.+)?$}i ) { |
} elsif ( $cmd =~ m{^i(?:nfo)?\s?(\S+)?$}i ) { |
453 |
info; |
info $1; |
454 |
} elsif ( $cmd =~ m{^(q|e|x)}i ) { |
} elsif ( $cmd =~ m{^(q|e|x)}i ) { |
455 |
warn "# exit"; |
warn "# exit"; |
456 |
send_nodes 'exit'; |
send_nodes 'exit'; |