--- googlecode.com/svn/trunk/Meteor/Subscriber.pm 2006/11/20 17:59:30 3 +++ googlecode.com/svn/trunk/Meteor/Subscriber.pm 2008/01/25 17:12:02 35 @@ -43,6 +43,7 @@ @Meteor::Subscriber::ISA=qw(Meteor::Connection); our %PersistentConnections=(); + our $NumAcceptedConnections=0; ############################################################################### # Factory methods @@ -63,6 +64,9 @@ $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime; } + $::Statistics->{'current_subscribers'}++; + $::Statistics->{'subscriber_connections_accepted'}++; + $self; } @@ -85,7 +89,7 @@ my $msg=$::CONF{'PingMessage'}; my @cons=values %PersistentConnections; - map { $_->write($msg) } @cons; + map { $_->write($msg.chr(0)) } @cons; } sub checkPersistentConnectionsForMaxTime { @@ -97,6 +101,11 @@ map { $_->checkForMaxTime($time) } @cons; } +sub numSubscribers { + + return scalar(keys %PersistentConnections); +} + ############################################################################### # Instance methods ############################################################################### @@ -121,98 +130,69 @@ # Analyze header, register with appropiate channel # and send pending messages. # - # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1 + # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1 # # Find the 'GET' line # - if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/) + if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i) { - my @formData=split('&',$1); - my $channelName=undef; - my $startIndex=undef; - my $backtrack=undef; - my $persist=1; - my $subscriberID=undef; - foreach my $formElement (@formData) - { - if($formElement=~/^channel=(.+)$/) - { - $channelName=$1; - } - elsif($formElement=~/^restartfrom=(\d*)$/) - { - $startIndex=$1; - $startIndex='' unless(defined($startIndex)); - } - elsif($formElement=~/^backtrack=(\d+)$/) - { - $backtrack=$1; - $backtrack=0 unless(defined($backtrack)); - } - elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/) - { - $persist=0 if($1=~/(no|false|0)/i); - } - elsif($formElement=~/^id=(.+)$/) - { - $subscriberID=$1; - } - elsif($formElement=~/^maxmessages=(\d+)$/i) - { - $self->{'MaxMessageCount'}=$1; - } - elsif($formElement=~/^template=(\d+)$/i) - { - $self->{'HeaderTemplateNumber'}=$1; - } - elsif($formElement=~/^maxtime=(\d+)$/i) - { - my $clientRequest=$1; - my $serverDefault=$::CONF{'MaxTime'}; - - if($serverDefault==0 || $serverDefault>$clientRequest) - { - $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest; + my $subscriberID=$1; + my $persist=0; + $self->{'mode'}=$2; + if ($self->{'mode'} eq "xhrinteractive" || $self->{'mode'} eq "iframe" || $self->{'mode'} eq "serversent" || $self->{'mode'} eq "longpoll") { + $persist=1; + $self->{'MaxMessageCount'}=1 unless(!($self->{'mode'} eq "longpoll")); + } + if ($self->{'mode'} eq "iframe") { + $self->{'HeaderTemplateNumber'}=1; + } else { + $self->{'HeaderTemplateNumber'}=2; + } + my @channelData=split('/',$3); + my $channels={}; + my $channelName; + my $offset; + foreach my $chandef (@channelData) { + if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) { + $channelName = $1; + $channels->{$channelName}->{'startIndex'} = undef; + if ($3) { + $offset = $4; + if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; } + if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; } + if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; } } } } - - delete($self->{'headerBuffer'}); - if(defined($startIndex) && defined($backtrack)) - { - $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'"); - $self->close(); - - return; - } + delete($self->{'headerBuffer'}); - if(defined($subscriberID) && $persist) + if($persist) { $self->{'subscriberID'}=$subscriberID; $self->deleteSubscriberWithID($subscriberID); $PersistentConnections{$subscriberID}=$self; } - if(defined($channelName)) + if(scalar(keys %{$channels})) { $self->emitOKHeader(); - - $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack)); - - $self->setChannelName($channelName,$startIndex,$persist); - + $self->setChannels($channels,$persist); $self->close(1) unless($persist); - return; } } + elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/) + { + $self->deleteSubscriberWithID($1); + $self->emitOKHeader(); + $self->close(1); + return; + } elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/) { Meteor::Document->serveFileToClient($1,$self); - $self->close(1); - return; } @@ -223,16 +203,21 @@ } } -sub setChannelName { +sub setChannels { my $self=shift; - my $channelName=shift; - my $startIndex=shift; + my $channels=shift; my $persist=shift; - my $channel=Meteor::Channel->channelWithName($channelName); - $self->{'channel'}=$channel if($persist); - - $channel->addSubscriber($self,$startIndex,$persist); + foreach my $channelName (keys %{$channels}) + { + my $startIndex=$channels->{$channelName}->{'startIndex'}; + + my $channel=Meteor::Channel->channelWithName($channelName); + + $self->{'channels'}->{$channelName}=$channel if($persist); + + $channel->addSubscriber($self,$startIndex,$persist); + } } sub emitOKHeader { @@ -245,6 +230,7 @@ my $self=shift; $self->emitHeader('404 Not Found'); + $::Statistics->{'errors_served'}++; # close up shop here! $self->close(); @@ -286,14 +272,19 @@ } /gex; - $self->write($header); + $self->write($header.chr(0)); } sub sendMessage { my $self=shift; my $msg=shift; + my $numMsgInThisBatch=shift; + + $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch)); + + $self->write($msg.chr(0)); - $self->write($msg); + $::Statistics->{'messages_served'}+=$numMsgInThisBatch; my $msgCount=++$self->{'MessageCount'}; @@ -309,12 +300,30 @@ } } +sub closeChannel { + my $self=shift; + my $channelName=shift; + + return unless(exists($self->{'channels'}->{$channelName})); + + my $channel=$self->{'channels'}->{$channelName}; + $channel->removeSubscriber($self); + + delete($self->{'channels'}->{$channelName}); + + $self->close() if(scalar(keys %{$self->{'channels'}})==0); +} + sub close { my $self=shift; my $noShutdownMsg=shift; - $self->{'channel'}->removeSubscriber($self) if($self->{'channel'}); - delete($self->{'channel'}); + foreach my $channelName (keys %{$self->{'channels'}}) + { + my $channel=$self->{'channels'}->{$channelName}; + $channel->removeSubscriber($self); + } + delete($self->{'channels'}); if(exists($self->{'subscriberID'})) { @@ -334,6 +343,8 @@ } } + $::Statistics->{'current_subscribers'}--; + $self->SUPER::close(); }