--- googlecode.com/svn/trunk/Meteor/Subscriber.pm 2006/11/20 17:59:30 3 +++ googlecode.com/svn/trunk/Meteor/Subscriber.pm 2007/12/20 21:24:24 32 @@ -43,6 +43,7 @@ @Meteor::Subscriber::ISA=qw(Meteor::Connection); our %PersistentConnections=(); + our $NumAcceptedConnections=0; ############################################################################### # Factory methods @@ -63,6 +64,9 @@ $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime; } + $::Statistics->{'current_subscribers'}++; + $::Statistics->{'subscriber_connections_accepted'}++; + $self; } @@ -97,6 +101,11 @@ map { $_->checkForMaxTime($time) } @cons; } +sub numSubscribers { + + return scalar(keys %PersistentConnections); +} + ############################################################################### # Instance methods ############################################################################### @@ -121,98 +130,71 @@ # Analyze header, register with appropiate channel # and send pending messages. # - # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1 + # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1 # # Find the 'GET' line # - if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/) + if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i) { - my @formData=split('&',$1); - my $channelName=undef; - my $startIndex=undef; - my $backtrack=undef; - my $persist=1; - my $subscriberID=undef; - foreach my $formElement (@formData) + my $subscriberID=$1; + my $persist=0; + $self->{'mode'}=$2; + if ($self->{'mode'} eq "xhrinteractive" || $self->{'mode'} eq "iframe" || $self->{'mode'} eq "serversent" || $self->{'mode'} eq "longpoll") { + $persist=1; + $self->{'MaxMessageCount'}=1 unless(!($self->{'mode'} eq "longpoll")); + } + if ($self->{'mode'} eq "iframe") { + $self->{'HeaderTemplateNumber'}=1; + } else { + $self->{'HeaderTemplateNumber'}=2; + } + my @channelData=split('/',$3); + my $channels={}; + my $channelName; + my $offset; + foreach my $chandef (@channelData) { - if($formElement=~/^channel=(.+)$/) - { - $channelName=$1; - } - elsif($formElement=~/^restartfrom=(\d*)$/) - { - $startIndex=$1; - $startIndex='' unless(defined($startIndex)); - } - elsif($formElement=~/^backtrack=(\d+)$/) - { - $backtrack=$1; - $backtrack=0 unless(defined($backtrack)); - } - elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/) - { - $persist=0 if($1=~/(no|false|0)/i); - } - elsif($formElement=~/^id=(.+)$/) - { - $subscriberID=$1; - } - elsif($formElement=~/^maxmessages=(\d+)$/i) - { - $self->{'MaxMessageCount'}=$1; - } - elsif($formElement=~/^template=(\d+)$/i) - { - $self->{'HeaderTemplateNumber'}=$1; - } - elsif($formElement=~/^maxtime=(\d+)$/i) + if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/) { - my $clientRequest=$1; - my $serverDefault=$::CONF{'MaxTime'}; - - if($serverDefault==0 || $serverDefault>$clientRequest) - { - $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest; + $channelName = $1; + $channels->{$channelName}->{'startIndex'} = undef; + for ($3) { + $offset = $4; + /r/ && do { $channels->{$channelName}->{'startIndex'} = $offset; last; }; + /b/ && do { $channels->{$channelName}->{'startIndex'} = -$offset; last; }; + /h/ && do { $channels->{$channelName}->{'startIndex'} = 0; last; }; } } } - - delete($self->{'headerBuffer'}); - if(defined($startIndex) && defined($backtrack)) - { - $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'"); - $self->close(); - - return; - } + delete($self->{'headerBuffer'}); - if(defined($subscriberID) && $persist) + if($persist) { $self->{'subscriberID'}=$subscriberID; $self->deleteSubscriberWithID($subscriberID); $PersistentConnections{$subscriberID}=$self; } - if(defined($channelName)) + if(scalar(keys %{$channels})) { $self->emitOKHeader(); - - $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack)); - - $self->setChannelName($channelName,$startIndex,$persist); - + $self->setChannels($channels,$persist); $self->close(1) unless($persist); - return; } } + elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/) + { + $self->deleteSubscriberWithID($1); + $self->emitOKHeader(); + $self->close(1); + return; + } elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/) { Meteor::Document->serveFileToClient($1,$self); - $self->close(1); - return; } @@ -223,16 +205,21 @@ } } -sub setChannelName { +sub setChannels { my $self=shift; - my $channelName=shift; - my $startIndex=shift; + my $channels=shift; my $persist=shift; - my $channel=Meteor::Channel->channelWithName($channelName); - $self->{'channel'}=$channel if($persist); - - $channel->addSubscriber($self,$startIndex,$persist); + foreach my $channelName (keys %{$channels}) + { + my $startIndex=$channels->{$channelName}->{'startIndex'}; + + my $channel=Meteor::Channel->channelWithName($channelName); + + $self->{'channels'}->{$channelName}=$channel if($persist); + + $channel->addSubscriber($self,$startIndex,$persist); + } } sub emitOKHeader { @@ -245,6 +232,7 @@ my $self=shift; $self->emitHeader('404 Not Found'); + $::Statistics->{'errors_served'}++; # close up shop here! $self->close(); @@ -292,9 +280,14 @@ sub sendMessage { my $self=shift; my $msg=shift; + my $numMsgInThisBatch=shift; + + $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch)); $self->write($msg); + $::Statistics->{'messages_served'}+=$numMsgInThisBatch; + my $msgCount=++$self->{'MessageCount'}; my $maxMsg=$::CONF{'MaxMessages'}; @@ -309,12 +302,30 @@ } } +sub closeChannel { + my $self=shift; + my $channelName=shift; + + return unless(exists($self->{'channels'}->{$channelName})); + + my $channel=$self->{'channels'}->{$channelName}; + $channel->removeSubscriber($self); + + delete($self->{'channels'}->{$channelName}); + + $self->close() if(scalar(keys %{$self->{'channels'}})==0); +} + sub close { my $self=shift; my $noShutdownMsg=shift; - $self->{'channel'}->removeSubscriber($self) if($self->{'channel'}); - delete($self->{'channel'}); + foreach my $channelName (keys %{$self->{'channels'}}) + { + my $channel=$self->{'channels'}->{$channelName}; + $channel->removeSubscriber($self); + } + delete($self->{'channels'}); if(exists($self->{'subscriberID'})) { @@ -334,6 +345,8 @@ } } + $::Statistics->{'current_subscribers'}--; + $self->SUPER::close(); }