--- googlecode.com/svn/trunk/Meteor/Subscriber.pm 2006/12/14 16:29:42 11 +++ googlecode.com/svn/trunk/Meteor/Subscriber.pm 2008/11/27 00:33:21 62 @@ -43,6 +43,8 @@ @Meteor::Subscriber::ISA=qw(Meteor::Connection); our %PersistentConnections=(); + our $NumAcceptedConnections=0; + ############################################################################### # Factory methods @@ -53,16 +55,17 @@ my $self=$class->SUPER::newFromServer(shift); $self->{'headerBuffer'}=''; - $self->{'MessageCount'}=0; - $self->{'MaxMessageCount'}=0; + $self->{'messageCount'}=0; - $self->{'ConnectionStart'}=time; my $maxTime=$::CONF{'MaxTime'}; if($maxTime>0) { - $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime; + $self->{'connectionTimeLimit'}=$self->{'connectionStart'}+$maxTime; } + $::Statistics->{'current_subscribers'}++; + $::Statistics->{'subscriber_connections_accepted'}++; + $self; } @@ -75,17 +78,23 @@ if(exists($PersistentConnections{$id})) { - $PersistentConnections{$id}->close(1); + $PersistentConnections{$id}->close(0,'newSubscriberWithSameID'); } } +sub subscriberExists { + my $class=shift; + my $id=shift; + + return exists($PersistentConnections{$id}); +} + sub pingPersistentConnections { my $class=shift; - my $msg=$::CONF{'PingMessage'}; my @cons=values %PersistentConnections; - map { $_->write($msg) } @cons; + map { $_->ping() } @cons; } sub checkPersistentConnectionsForMaxTime { @@ -97,6 +106,22 @@ map { $_->checkForMaxTime($time) } @cons; } +sub numSubscribers { + + return scalar(keys %PersistentConnections); +} + +sub listSubscribers { + my $class=shift; + my $list=''; + foreach my $subscriber (keys %PersistentConnections) + { + my $sub = $PersistentConnections{$subscriber}; + $list .= $subscriber.' '.$sub->{'ip'}.' '.$sub->{'connectionStart'}.' '.$sub->{'connectionTimeLimit'}.' '.($sub->{'connectionTimeLimit'}-time).' '.$sub->{'messageCount'}.' "'.$sub->{'useragent'}."\"$::CRLF"; + } + $list; +} + ############################################################################### # Instance methods ############################################################################### @@ -104,7 +129,7 @@ my $self=shift; my $line=shift; - # Once the header was processed we ignore any input + # Once the header was processed we ignore any input - Meteor does not accept or process request bodies return unless(exists($self->{'headerBuffer'})); if($line ne '') @@ -121,98 +146,79 @@ # Analyze header, register with appropiate channel # and send pending messages. # - # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1 + # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1 # # Find the 'GET' line # - if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/) + if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i) { - my @formData=split('&',$1); - my $channelName=undef; - my $startIndex=undef; - my $backtrack=undef; - my $persist=1; - my $subscriberID=undef; - foreach my $formElement (@formData) - { - if($formElement=~/^channel=(.+)$/) - { - $channelName=$1; - } - elsif($formElement=~/^restartfrom=(\d*)$/) - { - $startIndex=$1; - $startIndex='' unless(defined($startIndex)); - } - elsif($formElement=~/^backtrack=(\d+)$/) - { - $backtrack=$1; - $backtrack=0 unless(defined($backtrack)); - } - elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/) - { - $persist=0 if($1=~/(no|false|0)/i); - } - elsif($formElement=~/^id=(.+)$/) - { - $subscriberID=$1; - } - elsif($formElement=~/^maxmessages=(\d+)$/i) - { - $self->{'MaxMessageCount'}=$1; - } - elsif($formElement=~/^template=(\d+)$/i) - { - $self->{'HeaderTemplateNumber'}=$1; - } - elsif($formElement=~/^maxtime=(\d+)$/i) - { - my $clientRequest=$1; - my $serverDefault=$::CONF{'MaxTime'}; - - if($serverDefault==0 || $serverDefault>$clientRequest) - { - $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest; + $self->{'subscriberID'}=$1; + $self->{'mode'}=$2; + my $persist=$self->getConf('Persist'); + my $maxTime=$self->getConf('MaxTime'); + $self->{'connectionTimeLimit'} = ($self->{'connectionStart'}+$maxTime) if ($maxTime>0); + + my @channelData=split('/',$3); + my $channels={}; + my $channelName; + my $offset; + foreach my $chandef (@channelData) { + if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) { + $channelName = $1; + $channels->{$channelName}->{'startIndex'} = undef; + if ($3) { + $offset = $4; + if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; } + if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; } + if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; } } } } - - delete($self->{'headerBuffer'}); - - if(defined($startIndex) && defined($backtrack)) - { - $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'"); - $self->close(); - - return; - } + $self->{'useragent'} = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-"; - if(defined($subscriberID) && $persist) - { - $self->{'subscriberID'}=$subscriberID; - $self->deleteSubscriberWithID($subscriberID); - $PersistentConnections{$subscriberID}=$self; + if ($persist) { + # New persistent connection: kill any existing connection with same ID + $self->deleteSubscriberWithID($self->{'subscriberID'}); + # Add new persistent connection to collection + $PersistentConnections{$self->{'subscriberID'}}=$self; + } else { + $::Pollers->{$self->{'subscriberID'}} = time; } - if(defined($channelName)) - { + if(scalar(keys %{$channels})) { + + $self->{'channelinfo'} = ''; + my $citemplate = $self->getConf('ChannelInfoTemplate'); + foreach $channelName (keys %{$channels}) { + my $channel=Meteor::Channel->channelWithName($channelName); + $self->{'channels'}->{$channelName}=$channel; + if (defined($self->{'channels'}->{$channelName}->{'startIndex'}) && $self->{'channels'}->{$channelName}->{'startIndex'} > 0) { + $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate); + } + } $self->emitOKHeader(); - - $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack)); - - $self->setChannelName($channelName,$startIndex,$persist); - - $self->close(1) unless($persist); - + foreach $channelName (keys %{$channels}) { + my $startIndex=$channels->{$channelName}->{'startIndex'}; + $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$self->{'useragent'}); + } + delete ($self->{'channels'}) unless($persist); + $self->close(1, 'responseComplete') unless($persist); + $self->close(1, 'closedOnEvent') unless($self->{'messageCount'} == 0); + delete($self->{'headerBuffer'}); return; } } + elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/) + { + $self->deleteSubscriberWithID($1); + $self->emitOKHeader(); + $self->close(1, 'disconnectRequested'); + return; + } elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/) { Meteor::Document->serveFileToClient($1,$self); - - $self->close(1); - + $self->close(1, 'responseComplete'); return; } @@ -223,18 +229,6 @@ } } -sub setChannelName { - my $self=shift; - my $channelName=shift; - my $startIndex=shift; - my $persist=shift; - - my $channel=Meteor::Channel->channelWithName($channelName); - $self->{'channel'}=$channel if($persist); - - $channel->addSubscriber($self,$startIndex,$persist); -} - sub emitOKHeader { my $self=shift; @@ -245,43 +239,30 @@ my $self=shift; $self->emitHeader('404 Not Found'); + $::Statistics->{'errors_served'}++; # close up shop here! - $self->close(); + $self->close(0, 'error'); } sub emitHeader { my $self=shift; my $status=shift; - my $header=undef; - if(exists($self->{'HeaderTemplateNumber'})) - { - my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'}; - - $header=$::CONF{$hn}; - } - $header=$::CONF{'HeaderTemplate'} unless(defined($header)); + my $header=$self->getConf('HeaderTemplate'); - $header=~s/~([^~]+)~/ - if(!defined($1) || $1 eq '') - { + $header=~s/~([^~]*)~/ + if(!defined($1) || $1 eq '') { '~'; - } - elsif($1 eq 'server') - { + } elsif($1 eq 'server') { $::PGM; - } - elsif($1 eq 'status') - { + } elsif($1 eq 'status') { $status; - } - elsif($1 eq 'servertime') - { + } elsif($1 eq 'servertime') { time; - } - else - { + } elsif($1 eq 'channelinfo') { + $self->{'channelinfo'}; + } else { ''; } /gex; @@ -289,59 +270,123 @@ $self->write($header); } -sub sendMessage { +sub sendMessages { my $self=shift; - my $msg=shift; - $self->write($msg); - - my $msgCount=++$self->{'MessageCount'}; + my $numMessages=0; + my $msgTemplate=$self->getConf('MessageTemplate'); + my $msgData=''; - my $maxMsg=$::CONF{'MaxMessages'}; - if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg) + foreach my $message (@_) { - $self->close(1); + $msgData.=$message->messageWithTemplate($msgTemplate); + $numMessages++; } - if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'}) - { - $self->close(1); + return if($numMessages<1); + + $self->write($msgData); + + $::Statistics->{'messages_served'}+=$numMessages; + + my $msgCount=$self->{'messageCount'}; + $msgCount+=$numMessages; + $self->{'messageCount'}=$msgCount; + + # If long polling, close connection, as a message has now been sent. + # Don't close if still processing the header (we may be sending a backlog from multiple channels) + if($self->getConf('CloseOnEvent') && !exists($self->{'headerBuffer'})) { + $self->close(1, 'closedOnEvent'); } } +sub ping { + my $self=shift; + my $msg=$self->getConf('PingMessage'); + + $self->write($msg); +} + +sub closeChannel { + my $self=shift; + my $channelName=shift; + + return unless(exists($self->{'channels'}->{$channelName})); + + my $channel=$self->{'channels'}->{$channelName}; + $channel->removeSubscriber($self,'channelClose'); + + delete($self->{'channels'}->{$channelName}); + + $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0); +} + sub close { my $self=shift; my $noShutdownMsg=shift; + my $reason=shift; - $self->{'channel'}->removeSubscriber($self) if($self->{'channel'}); - delete($self->{'channel'}); - - if(exists($self->{'subscriberID'})) + foreach my $channelName (keys %{$self->{'channels'}}) { + my $channel=$self->{'channels'}->{$channelName}; + $channel->removeSubscriber($self,$reason); + } + delete($self->{'channels'}); + + # If this connection is in the PersistentConnections array, delete it, then anonymise + # it so that if we have to wait for the write buffer to empty before close, it's only + # removed once. + if(exists($self->{'subscriberID'})) { delete($PersistentConnections{$self->{'subscriberID'}}); + delete($self->{'subscriberID'}); } - # # Send shutdown message unless remote closed or # connection not yet established - # unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'})) { - my $msg=$::CONF{'SubscriberShutdownMsg'}; + my $msg=$self->getConf('SubscriberShutdownMsg'); if(defined($msg) && $msg ne '') { $self->write($msg); } } - + + my $fmsg=$self->getConf('FooterTemplate'); + if(defined($fmsg) && $fmsg ne '') + { + $self->write($fmsg); + } + $self->SUPER::close(); } +sub didClose { + + $::Statistics->{'current_subscribers'}--; +} + sub checkForMaxTime { my $self=shift; my $time=shift; - $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time); + $self->close(1,'maxTime') if(exists($self->{'connectionTimeLimit'}) && $self->{'connectionTimeLimit'}<$time); +} + +sub getConf { + my $self=shift; + my $key=shift; + + if(exists($self->{'mode'}) && $self->{'mode'} ne '') + { + my $k=$key.$self->{'mode'}; + + if(exists($::CONF{$k})) { + return $::CONF{$k}; + } + } + + $::CONF{$key}; } 1;