6 |
use IO::Socket::INET; |
use IO::Socket::INET; |
7 |
use Data::Dump qw(dump); |
use Data::Dump qw(dump); |
8 |
use Storable; |
use Storable; |
9 |
|
use File::Slurp; |
10 |
|
|
11 |
our $pids; |
our $pids; |
12 |
our $ports; |
our $ports; |
28 |
|
|
29 |
if ( my $pid = fork ) { |
if ( my $pid = fork ) { |
30 |
# parent |
# parent |
31 |
$pids->{ $host } = $pid; |
$pids->{ "$host:$port" } = $pid; |
32 |
$ports->{ $port } = $host; |
$ports->{ $port } = $host; |
33 |
|
|
34 |
my $sock; |
my $sock; |
105 |
return $result; |
return $result; |
106 |
} |
} |
107 |
|
|
108 |
|
|
109 |
|
our $out; |
110 |
|
|
111 |
|
sub merge { |
112 |
|
my ( $self, $new ) = @_; |
113 |
|
|
114 |
|
my $t_merge = time(); |
115 |
|
|
116 |
|
my $tick = 0; |
117 |
|
|
118 |
|
my $missing; |
119 |
|
|
120 |
|
foreach my $k1 ( keys %$new ) { |
121 |
|
|
122 |
|
foreach my $k2 ( keys %{ $new->{$k1} } ) { |
123 |
|
|
124 |
|
my $n = delete $new->{$k1}->{$k2}; |
125 |
|
|
126 |
|
my $ref = ref $out->{$k1}->{$k2}; |
127 |
|
|
128 |
|
if ( ! defined $out->{$k1}->{$k2} ) { |
129 |
|
$out->{$k1}->{$k2} = $n; |
130 |
|
} elsif ( $k1 =~ m{\+} ) { |
131 |
|
# warn "## agregate $k1 $k2"; |
132 |
|
$out->{$k1}->{$k2} += $n; |
133 |
|
} elsif ( $ref eq 'ARRAY' ) { |
134 |
|
if ( ref $n eq 'ARRAY' ) { |
135 |
|
push @{ $out->{$k1}->{$k2} }, $_ foreach @$n; |
136 |
|
} else { |
137 |
|
push @{ $out->{$k1}->{$k2} }, $n; |
138 |
|
} |
139 |
|
} elsif ( $ref eq '' ) { |
140 |
|
$out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ]; |
141 |
|
} else { |
142 |
|
die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2}); |
143 |
|
} |
144 |
|
|
145 |
|
if ( $tick++ % 1000 == 0 ) { |
146 |
|
print STDERR "."; |
147 |
|
} elsif ( $tick % 10000 == 0 ) { |
148 |
|
print STDERR $tick; |
149 |
|
} |
150 |
|
} |
151 |
|
} |
152 |
|
|
153 |
|
$t_merge = time - $t_merge; |
154 |
|
warn sprintf "\nmerged %d in %.4fs %.2f/s\n", $tick, $t_merge, $t_merge / $tick ; |
155 |
|
|
156 |
|
} |
157 |
|
|
158 |
|
|
159 |
|
sub view { |
160 |
|
my ( $self, $view ) = @_; |
161 |
|
|
162 |
|
warn "run view $view ", -s $view, " bytes\n"; |
163 |
|
|
164 |
|
my $view_code = read_file($view); |
165 |
|
$self->send_to_all({ view => $view_code }); |
166 |
|
|
167 |
|
foreach my $port ( keys %{ $self->{connected} } ) { |
168 |
|
warn "get_from $port\n"; |
169 |
|
my $result = $self->get_from( $port ); |
170 |
|
warn dump $result; |
171 |
|
if ( $result->{view} ) { |
172 |
|
$self->merge( $result->{view} ); |
173 |
|
} else { |
174 |
|
warn "no view from $port\n"; |
175 |
|
} |
176 |
|
} |
177 |
|
|
178 |
|
return $out; |
179 |
|
} |
180 |
|
|
181 |
|
|
182 |
sub DESTROY { |
sub DESTROY { |
183 |
warn "pids ",dump( $pids ); |
warn "pids ",dump( $pids ); |
184 |
foreach ( values %$pids ) { |
foreach ( values %$pids ) { |