--- googlecode.com/svn/trunk/Meteor/Subscriber.pm 2007/04/30 18:16:17 13 +++ googlecode.com/svn/trunk/Meteor/Subscriber.pm 2008/02/04 22:30:35 49 @@ -43,6 +43,8 @@ @Meteor::Subscriber::ISA=qw(Meteor::Connection); our %PersistentConnections=(); + our $NumAcceptedConnections=0; + ############################################################################### # Factory methods @@ -63,6 +65,9 @@ $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime; } + $::Statistics->{'current_subscribers'}++; + $::Statistics->{'subscriber_connections_accepted'}++; + $self; } @@ -75,17 +80,16 @@ if(exists($PersistentConnections{$id})) { - $PersistentConnections{$id}->close(1); + $PersistentConnections{$id}->close(); } } sub pingPersistentConnections { my $class=shift; - my $msg=$::CONF{'PingMessage'}; my @cons=values %PersistentConnections; - map { $_->write($msg) } @cons; + map { $_->ping() } @cons; } sub checkPersistentConnectionsForMaxTime { @@ -97,6 +101,11 @@ map { $_->checkForMaxTime($time) } @cons; } +sub numSubscribers { + + return scalar(keys %PersistentConnections); +} + ############################################################################### # Instance methods ############################################################################### @@ -121,102 +130,42 @@ # Analyze header, register with appropiate channel # and send pending messages. # - # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1 + # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1 # # Find the 'GET' line # - if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/) + if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i) { - my @formData=split('&',$1); - my $channelName=undef; - my $startIndex=undef; - my $backtrack=undef; - my $persist=1; - my $anyPersist=0; - my $subscriberID=undef; - my $channels={}; - foreach my $formElement (@formData) + my $subscriberID=$1; + $self->{'mode'}=$2; + my $persist=$self->getConf('Persist'); + + my $maxTime=$self->getConf('MaxTime'); + if($maxTime>0) { - if($formElement=~/^channel=(.+)$/) - { - if(defined($channelName)) - { - if(defined($startIndex) && defined($backtrack)) - { - $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'"); - $self->close(); - - return; - } - - $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack)); - $channels->{$channelName}->{'startIndex'}=$startIndex; - $channels->{$channelName}->{'persist'}=$persist; - $anyPersist|=$persist; - - $startIndex=undef; - $backtrack=undef; - $persist=1; - } - $channelName=$1; - } - elsif($formElement=~/^restartfrom=(\d*)$/) - { - $startIndex=$1; - $startIndex='' unless(defined($startIndex)); - } - elsif($formElement=~/^backtrack=(\d+)$/) - { - $backtrack=$1; - $backtrack=0 unless(defined($backtrack)); - } - elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/) - { - $persist=0 if($1=~/(no|false|0)/i); - } - elsif($formElement=~/^id=(.+)$/) - { - $subscriberID=$1; - } - elsif($formElement=~/^maxmessages=(\d+)$/i) - { - $self->{'MaxMessageCount'}=$1; - } - elsif($formElement=~/^template=(\d+)$/i) - { - $self->{'HeaderTemplateNumber'}=$1; - } - elsif($formElement=~/^maxtime=(\d+)$/i) - { - my $clientRequest=$1; - my $serverDefault=$::CONF{'MaxTime'}; - - if($serverDefault==0 || $serverDefault>$clientRequest) - { - $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest; - } - } + $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime; } - if(defined($channelName)) - { - if(defined($startIndex) && defined($backtrack)) - { - $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'"); - $self->close(); - - return; + my @channelData=split('/',$3); + my $channels={}; + my $channelName; + my $offset; + foreach my $chandef (@channelData) { + if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) { + $channelName = $1; + $channels->{$channelName}->{'startIndex'} = undef; + if ($3) { + $offset = $4; + if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; } + if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; } + if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; } + } } - - $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack)); - $channels->{$channelName}->{'startIndex'}=$startIndex; - $channels->{$channelName}->{'persist'}=$persist; - $anyPersist|=$persist; } delete($self->{'headerBuffer'}); - if(defined($subscriberID) && $anyPersist) + if($persist) { $self->{'subscriberID'}=$subscriberID; $self->deleteSubscriberWithID($subscriberID); @@ -226,20 +175,22 @@ if(scalar(keys %{$channels})) { $self->emitOKHeader(); - - $self->setChannels($channels); - - $self->close(1) unless($anyPersist); - + $self->setChannels($channels,$persist,$self->{'mode'},''); + $self->close(1) unless($persist); return; } } + elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/) + { + $self->deleteSubscriberWithID($1); + $self->emitOKHeader(); + $self->close(1); + return; + } elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/) { Meteor::Document->serveFileToClient($1,$self); - $self->close(1); - return; } @@ -253,17 +204,19 @@ sub setChannels { my $self=shift; my $channels=shift; + my $persist=shift; + my $mode=shift || ''; + my $userAgent=shift || ''; foreach my $channelName (keys %{$channels}) { - my $persist=$channels->{$channelName}->{'persist'}; my $startIndex=$channels->{$channelName}->{'startIndex'}; my $channel=Meteor::Channel->channelWithName($channelName); $self->{'channels'}->{$channelName}=$channel if($persist); - $channel->addSubscriber($self,$startIndex,$persist); + $channel->addSubscriber($self,$startIndex,$persist,$mode,$userAgent); } } @@ -277,6 +230,7 @@ my $self=shift; $self->emitHeader('404 Not Found'); + $::Statistics->{'errors_served'}++; # close up shop here! $self->close(); @@ -286,16 +240,9 @@ my $self=shift; my $status=shift; - my $header=undef; - if(exists($self->{'HeaderTemplateNumber'})) - { - my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'}; - - $header=$::CONF{$hn}; - } - $header=$::CONF{'HeaderTemplate'} unless(defined($header)); + my $header=$self->getConf('HeaderTemplate'); - $header=~s/~([^~]+)~/ + $header=~s/~([^~]*)~/ if(!defined($1) || $1 eq '') { '~'; @@ -312,6 +259,10 @@ { time; } + elsif($1 eq 'channelinfo') + { + Meteor::Channel->listChannelsUsingTemplate($self->getConf('ChannelInfoTemplate')); + } else { ''; @@ -321,15 +272,30 @@ $self->write($header); } -sub sendMessage { +sub sendMessages { my $self=shift; - my $msg=shift; - $self->write($msg); + my $numMessages=0; + my $msgTemplate=$self->getConf('Messagetemplate'); + my $msgData=''; + + foreach my $message (@_) + { + $msgData.=$message->messageWithTemplate($msgTemplate); + $numMessages++; + } - my $msgCount=++$self->{'MessageCount'}; + return if($numMessages<1); - my $maxMsg=$::CONF{'MaxMessages'}; + $self->write($msgData); + + $::Statistics->{'messages_served'}+=$numMessages; + + my $msgCount=$self->{'MessageCount'}; + $msgCount+=$numMessages; + $self->{'MessageCount'}=$msgCount; + + my $maxMsg=$self->getConf('MaxMessages'); if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg) { $self->close(1); @@ -339,6 +305,14 @@ { $self->close(1); } + +} + +sub ping { + my $self=shift; + my $msg=$self->getConf('PingMessage'); + + $self->write($msg); } sub closeChannel { @@ -348,11 +322,11 @@ return unless(exists($self->{'channels'}->{$channelName})); my $channel=$self->{'channels'}->{$channelName}; - $channel->removeSubscriber($self); + $channel->removeSubscriber($self,'channelClose'); delete($self->{'channels'}->{$channelName}); - $self->close() if(scalar(keys %{$self->{'channels'}})==0); + $self->close(0,'channelsClosed') if(scalar(keys %{$self->{'channels'}})==0); } sub close { @@ -362,7 +336,7 @@ foreach my $channelName (keys %{$self->{'channels'}}) { my $channel=$self->{'channels'}->{$channelName}; - $channel->removeSubscriber($self); + $channel->removeSubscriber($self,'subscriberClose'); } delete($self->{'channels'}); @@ -377,7 +351,7 @@ # unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'})) { - my $msg=$::CONF{'SubscriberShutdownMsg'}; + my $msg=$self->getConf('SubscriberShutdownMsg'); if(defined($msg) && $msg ne '') { $self->write($msg); @@ -387,6 +361,11 @@ $self->SUPER::close(); } +sub didClose { + + $::Statistics->{'current_subscribers'}--; +} + sub checkForMaxTime { my $self=shift; my $time=shift; @@ -394,5 +373,19 @@ $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time); } +sub getConf { + my $self=shift; + my $key=shift; + + if(exists($self->{'mode'}) && $self->{'mode'} ne '') + { + my $k=$key.'.'.$self->{'mode'}; + + return $::CONF{$k} if(exists($::CONF{$k})); + } + + $::CONF{$key}; +} + 1; ############################################################################EOF \ No newline at end of file