/[Sack]/trunk/lib/Sack/Node.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /trunk/lib/Sack/Node.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 127 - (show annotations)
Wed Oct 7 16:21:33 2009 UTC (14 years, 7 months ago) by dpavlin
File size: 2614 byte(s)
better recovery from node storable serialization errors,
faster cloud startup, 
added common pid handling,
added [u]pdate to lorry which push updated code to nodes,
tweaks to output,
version bump for Sack::Node [0.08]

1 package Sack::Node;
2
3 use warnings;
4 use strict;
5
6 use IO::Socket::INET;
7 use File::Slurp;
8 use Carp qw(confess);
9 use Data::Dump qw(dump);
10 use Storable;
11 use Time::HiRes qw(time);
12
13 use lib 'lib';
14 use base qw(Sack::Pid);
15 use Sack::Color;
16
17 our $VERSION = '0.09';
18
19 sub new {
20 my $class = shift;
21 my $port = shift;
22 my $self = bless { port => $port }, $class;
23
24 $self->port_pid( $port );
25
26 my $sock = IO::Socket::INET->new(
27 Listen => SOMAXCONN,
28 LocalAddr => '127.0.0.1',
29 LocalPort => $port,
30 Proto => 'tcp',
31 Reuse => 1,
32 ) or die "[$port] die $!";
33
34 my $client;
35
36 while ( 1 ) {
37
38 if ( ! $client ) {
39 warn "[$port] accept $VERSION\n";
40 $client = $sock->accept();
41 warn "[$port] connect from ", $client->peerhost, $/;
42 }
43
44 my $data = eval { Storable::fd_retrieve( $client ) };
45 if ( $@ ) {
46 warn "[$port] ERROR $@\n";
47 close $client;
48 next;
49 }
50
51 if ( defined $data->{data} ) {
52 warn "# [$port] <<<< data\n";
53 } else {
54 warn "# [$port] <<<< ", dump( $data ), $/;
55 }
56
57 my $result;
58
59 if ( $data->{view} ) {
60 $result = $self->view( $data->{view} );
61 } elsif ( $data->{data} ) {
62 $self->{data} = delete $data->{data};
63 $result = { data => 'loaded' };
64 } elsif ( $data->{exit} ) {
65 warn "[$port] exit\n";
66 close $sock;
67 exit;
68 } elsif ( $data->{restart} ) {
69 warn "[$port] restart";
70 close $sock;
71 exec "$0 $port";
72 } elsif ( $data->{info} ) {
73 $result = {
74 version => $VERSION,
75 size => $#{ $self->{data} } + 1,
76 reports => $self->{reports},
77 };
78 } elsif ( my $sh = delete $data->{sh} ) {
79 $result = { sh => scalar `$sh` };
80 } else {
81 warn "[$port] UNKNOWN ", dump( $data ), $/;
82 $result = { 'error' => $data };
83 }
84
85 warn "# [$port] >>>>\n";
86 Storable::store_fd( $result => $client );
87 }
88
89 }
90
91
92 our $rec;
93 our $out;
94
95 sub view {
96 my ( $self, $code ) = @_;
97
98 undef $out;
99
100 my $affected = 0;
101 my $start_t = time;
102
103 my $coderef = eval "sub { $code }";
104 if ( $@ ) {
105 warn "ABORT code: $@";
106 return;
107 }
108
109
110 foreach my $pos ( 0 .. $#{ $self->{data} } ) {
111 $rec = $self->{data}->[$pos];
112 if ( ! $rec ) {
113 print STDERR "END @ $pos";
114 last;
115 }
116
117 eval { $coderef->() };
118 if ( $@ ) {
119 warn "ABORT $pos $@\n";
120 last;
121 } else {
122 $affected++;
123 }
124
125 $pos % 10000 == 0 ? print STDERR $pos :
126 $pos % 1000 == 0 ? print STDERR "." : 0 ;
127 };
128
129 my $dt = time - $start_t;
130 my $report = [ $self->{port}, $affected, $dt, $affected / $dt ];
131 warn sprintf "[%d] %d affected in %1.4fs %.2f/s\n", @$report;
132
133 push @{ $self->{reports} }, "$affected in ${dt}s";
134
135 # warn "# out ", dump( $out );
136
137 return {
138 out => $out,
139 report => $report,
140 };
141 }
142
143 1;

  ViewVC Help
Powered by ViewVC 1.1.26