/[Sack]/trunk/lib/Sack/Lorry.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/lib/Sack/Lorry.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 96 by dpavlin, Sun Oct 4 12:50:10 2009 UTC revision 97 by dpavlin, Sun Oct 4 13:09:28 2009 UTC
# Line 6  use strict; Line 6  use strict;
6  use IO::Socket::INET;  use IO::Socket::INET;
7  use Data::Dump qw(dump);  use Data::Dump qw(dump);
8  use Storable;  use Storable;
9    use File::Slurp;
10    
11  our $pids;  our $pids;
12  our $ports;  our $ports;
# Line 27  sub start_node { Line 28  sub start_node {
28    
29          if ( my $pid = fork ) {          if ( my $pid = fork ) {
30                  # parent                  # parent
31                  $pids->{ $host } = $pid;                  $pids->{ "$host:$port" } = $pid;
32                  $ports->{ $port } = $host;                  $ports->{ $port } = $host;
33    
34                  my $sock;                  my $sock;
# Line 104  sub get_from_all { Line 105  sub get_from_all {
105          return $result;          return $result;
106  }  }
107    
108    
109    our $out;
110    
111    sub merge {
112            my ( $self, $new ) = @_;
113    
114            my $t_merge = time();
115    
116            my $tick = 0;
117    
118            my $missing;
119    
120            foreach my $k1 ( keys %$new ) {
121    
122                    foreach my $k2 ( keys %{ $new->{$k1} } ) {
123    
124                            my $n   = delete $new->{$k1}->{$k2};
125    
126                            my $ref = ref    $out->{$k1}->{$k2};
127    
128                            if ( ! defined $out->{$k1}->{$k2} ) {
129                                    $out->{$k1}->{$k2} = $n;
130                            } elsif ( $k1 =~ m{\+} ) {
131    #                               warn "## agregate $k1 $k2";
132                                    $out->{$k1}->{$k2} += $n;
133                            } elsif ( $ref  eq 'ARRAY' ) {
134                                    if ( ref $n eq 'ARRAY' ) {
135                                            push @{ $out->{$k1}->{$k2} }, $_ foreach @$n;
136                                    } else {
137                                            push @{ $out->{$k1}->{$k2} }, $n;
138                                    }
139                            } elsif ( $ref eq '' ) {
140                                    $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
141                            } else {
142                                    die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
143                            }
144    
145                            if ( $tick++ % 1000 == 0 ) {
146                                    print STDERR ".";
147                            } elsif ( $tick % 10000 == 0 ) {
148                                    print STDERR $tick;
149                            }
150                    }
151            }
152    
153            $t_merge = time - $t_merge;
154            warn sprintf "\nmerged %d in %.4fs %.2f/s\n", $tick, $t_merge, $t_merge / $tick ;
155    
156    }
157    
158    
159    sub view {
160            my ( $self, $view ) = @_;
161    
162            warn "run view $view ", -s $view, " bytes\n";
163    
164            my $view_code = read_file($view);
165            $self->send_to_all({ view => $view_code });
166    
167            foreach my $port ( keys %{ $self->{connected} } ) {
168                    warn "get_from $port\n";
169                    my $result = $self->get_from( $port );
170    warn dump $result;
171                    if ( $result->{view} ) {
172                            $self->merge( $result->{view} );
173                    } else {
174                            warn "no view from $port\n";
175                    }
176            }
177    
178            return $out;
179    }
180    
181    
182  sub DESTROY {  sub DESTROY {
183          warn "pids ",dump( $pids );          warn "pids ",dump( $pids );
184          foreach ( values %$pids ) {          foreach ( values %$pids ) {

Legend:
Removed from v.96  
changed lines
  Added in v.97

  ViewVC Help
Powered by ViewVC 1.1.26