--- trunk/lib/Sack/Node.pm 2009/11/08 13:44:24 190 +++ trunk/lib/Sack/View.pm 2009/11/08 14:47:48 193 @@ -1,113 +1,50 @@ -package Sack::Node; +package Sack::View; use warnings; use strict; -use IO::Socket::INET; -use File::Slurp; -use Carp qw(confess); use Data::Dump qw(dump); -use Storable; -use Time::HiRes qw(time); -use lib 'lib'; -use base qw(Sack::Pid); +use lib '/srv/Sack/lib'; use Sack::Color; use Sack; -sub new { - my $class = shift; - my $port = shift; - my $self = bless { port => $port }, $class; - - $self->port_pid( $port ); - - my $sock = IO::Socket::INET->new( - Listen => SOMAXCONN, - LocalAddr => '127.0.0.1', - LocalPort => $port, - Proto => 'tcp', - Reuse => 1, - ) or die "[$port] die $!"; - - my $client; - - while ( 1 ) { - - if ( ! $client ) { - warn "[$port] accept $Sack::VERSION\n"; - $client = $sock->accept(); - warn "[$port] connect from ", $client->peerhost, $/; - } +our $coderef; +our $out; - my $data = Storable::fd_retrieve( $client ); +sub out { $out }; - if ( defined $data->{data} ) { - warn "# [$port] <<<< data\n" if $self->{debug}; - } else { - warn "# [$port] <<<< ", dump( $data ), $/ if $self->{debug}; - } +sub code { + my ( $self, $code ) = @_; - my $result; +warn "XX code $code"; - if ( $data->{view} ) { - $result = $self->view( $data->{view} ); - } elsif ( $data->{data} ) { - $self->{data} = delete $data->{data}; - $result = { data => 'loaded' }; - } elsif ( $data->{exit} ) { - warn "[$port] exit\n"; - close $sock; - exit; - } elsif ( $data->{restart} ) { - warn "[$port] restart"; - close $sock; - exec "$0 $port"; - } elsif ( $data->{info} ) { - $result = { - version => $Sack::VERSION, - size => $#{ $self->{data} } + 1, - reports => $self->{reports}, - }; - } elsif ( my $sh = delete $data->{sh} ) { - $result = { sh => scalar `$sh` }; - } elsif ( defined $data->{debug} ) { - $result = { debug => $self->{debug} = $data->{debug} }; - } else { - warn "[$port] UNKNOWN ", dump( $data ), $/; - $result = { 'error' => 'unknown', data => $data }; - } + undef $out; - $result = { 'error' => 'result not reference', result => $result, data => $data } unless ref($result); + $coderef = eval "sub { my \$rec = \$_[0]; $code }"; - warn "# [$port] >>>>\n"; - Storable::store_fd( $result => $client ); + if ( $@ ) { + warn "ABORT code: $@"; + return; } + ref $coderef eq 'CODE'; } -sub view { - my ( $self, $code ) = @_; - - my $affected = 0; - my $start_t = time; - - my $out; +sub on_shard { + my ( $self, $data ) = @_; - my $coderef = eval "sub { my \$rec = \$_[0]; $code }"; - if ( $@ ) { - warn "ABORT code: $@"; - return; - } +warn "XX data ",dump $data; + my $affected = 0; - foreach my $pos ( 0 .. $#{ $self->{data} } ) { - if ( ! defined $self->{data}->[$pos] ) { + foreach my $pos ( 0 .. $#{ $data } ) { + if ( ! defined $data->[$pos] ) { print STDERR "END @ $pos"; last; } - eval { $coderef->( $self->{data}->[$pos] ) }; + eval { $coderef->( $data->[$pos] ) }; if ( $@ ) { warn "ABORT $pos $@\n"; @@ -120,17 +57,7 @@ $pos % 1000 == 0 ? print STDERR "." : 0 ; }; - my $dt = time - $start_t; - my $report = [ $self->{port}, $affected, $dt, $affected / $dt ]; - warn sprintf "[%d] %d affected in %1.4fs %.2f/s\n", @$report; - - push @{ $self->{reports} }, "$affected in ${dt}s"; - - warn "[$self->{port}] out ", dump( $out ),$/ if $self->{debug}; - - return { - out => $out, - }; + warn "## out ", dump( $out ); } 1;