/[Sack]/trunk/lib/Sack/Lorry.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /trunk/lib/Sack/Lorry.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 111 - (hide annotations)
Mon Oct 5 23:21:20 2009 UTC (14 years, 9 months ago) by dpavlin
File size: 3920 byte(s)
implement in-memory digest internal to lorry

1 dpavlin 92 package Sack::Lorry;
2    
3     use warnings;
4     use strict;
5    
6     use IO::Socket::INET;
7     use Data::Dump qw(dump);
8     use Storable;
9 dpavlin 97 use File::Slurp;
10 dpavlin 107 use Net::Ping;
11 dpavlin 92
12     our $pids;
13     our $ports;
14    
15     $SIG{CHLD} = 'IGNORE';
16    
17     my $port = 4000;
18    
19     sub new {
20     my $class = shift;
21     my $self = bless {@_}, $class;
22     return $self;
23     }
24    
25     sub start_node {
26     my ( $self, $host ) = @_;
27    
28 dpavlin 107 chomp $host;
29 dpavlin 96
30 dpavlin 107 my $p = Net::Ping->new;
31    
32     if ( ! $p->ping( $host ) ) {
33     warn "can't ping [$host]\n";
34     return;
35     }
36    
37     warn "start_node $host\n";
38    
39     my $ssh_config = '-F etc/lib.ssh';
40    
41 dpavlin 92 if ( my $pid = fork ) {
42     # parent
43 dpavlin 97 $pids->{ "$host:$port" } = $pid;
44 dpavlin 92 $ports->{ $port } = $host;
45    
46     my $sock;
47    
48     print STDERR "waiting for $port";
49    
50     while ( ! $sock ) {
51    
52     $sock = IO::Socket::INET->new(
53     PeerAddr => '127.0.0.1',
54     PeerPort => $port,
55     Proto => 'tcp',
56     );
57    
58     if ( ! $sock ) {
59     print STDERR ".";
60     sleep 1;
61     }
62    
63     }
64    
65     $self->{sock}->{$port} = $sock;
66    
67     warn "\nconnected to $port\n";
68    
69 dpavlin 93 $self->{connected}->{$port} = $host;
70    
71 dpavlin 92 return $port++;
72    
73     } elsif ( ! defined $pid ) {
74     warn "can't fork $host $port";
75     return;
76     } else {
77     # child
78 dpavlin 107
79 dpavlin 92 my $cmd = $host !~ m{^(localhost|127\.)}i ? qq|
80 dpavlin 107 ssh -f $ssh_config
81 dpavlin 92 -S /tmp/sock.$port.ssh
82     -L $port:127.0.0.1:$port
83     $host
84     | : '';
85    
86     $cmd .= qq|
87 dpavlin 96 /srv/Sack/bin/node.pl $port
88 dpavlin 92 |;
89    
90     $cmd =~ s{\s+}{ }sg;
91    
92     warn "exec: $cmd\n";
93     exec $cmd;
94     }
95     }
96    
97     sub send_to {
98     my ( $self, $port, $data ) = @_;
99 dpavlin 93 warn "send_to [$port]\n";
100 dpavlin 92 Storable::store_fd( $data => $self->{sock}->{$port} );
101     }
102    
103     sub get_from {
104     my ( $self, $port ) = @_;
105 dpavlin 93 warn "get_from [$port]\n";
106 dpavlin 92 Storable::fd_retrieve( $self->{sock}->{$port} );
107     }
108    
109 dpavlin 93 sub send_to_all {
110     my ( $self, $data ) = @_;
111     $self->send_to( $_, $data ) foreach keys %{ $self->{connected} };
112     }
113    
114     sub get_from_all {
115     my ( $self ) = @_;
116     my $result;
117     $result->{$_} = $self->get_from( $_ ) foreach keys %{ $self->{connected} };
118     return $result;
119     }
120    
121 dpavlin 97
122     our $out;
123    
124 dpavlin 111 use Digest::MD5 qw(md5);
125     our $nr = 0;
126     our $md5_nr;
127     our $digest_fh;
128     our @digest_offset;
129    
130 dpavlin 97 sub merge {
131     my ( $self, $new ) = @_;
132    
133     my $t_merge = time();
134    
135     my $tick = 0;
136    
137     my $missing;
138    
139     foreach my $k1 ( keys %$new ) {
140    
141     foreach my $k2 ( keys %{ $new->{$k1} } ) {
142    
143     my $n = delete $new->{$k1}->{$k2};
144    
145 dpavlin 111 if ( $k1 =~ m{#} ) {
146     my $md5 = md5 $k2;
147     if ( defined $md5_nr->{$md5} ) {
148     $k2 = $md5_nr->{$md5};
149     } else {
150     open( $digest_fh, '>', '/tmp/sack.digest' ) unless $digest_fh;
151     $digest_offset[ $nr ] = tell( $digest_fh );
152     print $digest_fh "$k2\n";
153    
154     $k2 = $md5_nr->{$md5} = $nr;
155     $nr++;
156     }
157     }
158    
159 dpavlin 97 my $ref = ref $out->{$k1}->{$k2};
160    
161     if ( ! defined $out->{$k1}->{$k2} ) {
162     $out->{$k1}->{$k2} = $n;
163     } elsif ( $k1 =~ m{\+} ) {
164     # warn "## agregate $k1 $k2";
165     $out->{$k1}->{$k2} += $n;
166     } elsif ( $ref eq 'ARRAY' ) {
167     if ( ref $n eq 'ARRAY' ) {
168     push @{ $out->{$k1}->{$k2} }, $_ foreach @$n;
169     } else {
170     push @{ $out->{$k1}->{$k2} }, $n;
171     }
172     } elsif ( $ref eq '' ) {
173     $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
174     } else {
175     die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
176     }
177    
178     if ( $tick++ % 1000 == 0 ) {
179     print STDERR ".";
180     } elsif ( $tick % 10000 == 0 ) {
181     print STDERR $tick;
182     }
183     }
184     }
185    
186     $t_merge = time - $t_merge;
187     warn sprintf "\nmerged %d in %.4fs %.2f/s\n", $tick, $t_merge, $t_merge / $tick ;
188    
189     }
190    
191    
192     sub view {
193     my ( $self, $view ) = @_;
194    
195 dpavlin 98 $out = {};
196    
197 dpavlin 97 warn "run view $view ", -s $view, " bytes\n";
198    
199     my $view_code = read_file($view);
200     $self->send_to_all({ view => $view_code });
201    
202     foreach my $port ( keys %{ $self->{connected} } ) {
203     my $result = $self->get_from( $port );
204 dpavlin 98 warn "# result ", dump $result if $self->{debug};
205 dpavlin 97 if ( $result->{view} ) {
206     $self->merge( $result->{view} );
207     } else {
208     warn "no view from $port\n";
209     }
210     }
211    
212     return $out;
213     }
214    
215    
216 dpavlin 92 sub DESTROY {
217     warn "pids ",dump( $pids );
218     foreach ( values %$pids ) {
219     warn "kill $_";
220     kill 1,$_ || kill 9, $_;
221     }
222     }
223    
224     1;

  ViewVC Help
Powered by ViewVC 1.1.26