--- googlecode.com/svn/trunk/Meteor/Subscriber.pm 2008/02/27 21:58:56 53 +++ googlecode.com/svn/trunk/Meteor/Subscriber.pm 2009/01/19 11:19:41 64 @@ -55,15 +55,7 @@ my $self=$class->SUPER::newFromServer(shift); $self->{'headerBuffer'}=''; - $self->{'MessageCount'}=0; - $self->{'MaxMessageCount'}=0; - - $self->{'ConnectionStart'}=time; - my $maxTime=$::CONF{'MaxTime'}; - if($maxTime>0) - { - $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime; - } + $self->{'messageCount'}=0; $::Statistics->{'current_subscribers'}++; $::Statistics->{'subscriber_connections_accepted'}++; @@ -84,6 +76,13 @@ } } +sub subscriberExists { + my $class=shift; + my $id=shift; + + return exists($PersistentConnections{$id}); +} + sub pingPersistentConnections { my $class=shift; @@ -106,6 +105,17 @@ return scalar(keys %PersistentConnections); } +sub listSubscribers { + my $class=shift; + my $list=''; + foreach my $subscriber (keys %PersistentConnections) + { + my $sub = $PersistentConnections{$subscriber}; + $list .= $subscriber.' '.$sub->{'ip'}.' '.$sub->{'connectionStart'}.' '.$sub->{'connectionTimeLimit'}.' '.($sub->{'connectionTimeLimit'}-time).' '.$sub->{'messageCount'}.' "'.$sub->{'useragent'}."\"$::CRLF"; + } + $list; +} + ############################################################################### # Instance methods ############################################################################### @@ -113,7 +123,7 @@ my $self=shift; my $line=shift; - # Once the header was processed we ignore any input + # Once the header was processed we ignore any input - Meteor does not accept or process request bodies return unless(exists($self->{'headerBuffer'})); if($line ne '') @@ -134,13 +144,13 @@ # # Find the 'GET' line # - if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i) + if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i) { $self->{'subscriberID'}=$1; $self->{'mode'}=$2; my $persist=$self->getConf('Persist'); my $maxTime=$self->getConf('MaxTime'); - $self->{'ConnectionTimeLimit'} = ($self->{'ConnectionStart'}+$maxTime) if ($maxTime>0); + $self->{'connectionTimeLimit'} = ($self->{'connectionStart'}+$maxTime) if ($maxTime>0); my @channelData=split('/',$3); my $channels={}; @@ -158,13 +168,15 @@ } } } - my $useragent = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-"; - - delete($self->{'headerBuffer'}); + $self->{'useragent'} = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-"; if ($persist) { + # New persistent connection: kill any existing connection with same ID $self->deleteSubscriberWithID($self->{'subscriberID'}); + # Add new persistent connection to collection $PersistentConnections{$self->{'subscriberID'}}=$self; + } else { + $::Pollers->{$self->{'subscriberID'}} = time; } if(scalar(keys %{$channels})) { @@ -174,16 +186,25 @@ foreach $channelName (keys %{$channels}) { my $channel=Meteor::Channel->channelWithName($channelName); $self->{'channels'}->{$channelName}=$channel; - $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate); - + if (defined($self->{'channels'}->{$channelName}->{'startIndex'}) && $self->{'channels'}->{$channelName}->{'startIndex'} > 0) { + $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate); + } } $self->emitOKHeader(); foreach $channelName (keys %{$channels}) { my $startIndex=$channels->{$channelName}->{'startIndex'}; - $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$useragent); + $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$self->{'useragent'}); + } + if (!$persist) { + delete ($self->{'channels'}); + $self->close(1, 'responseComplete'); + } + delete($self->{'headerBuffer'}); + + # If long polling, close connection immediately if any messages have been sent + if ($self->{'messageCount'} > 0 && $self->{'mode'} eq 'longpoll') { + $self->close(1, 'closedOnEvent'); } - delete ($self->{'channels'}) unless($persist); - $self->close(1, 'responseComplete') unless($persist); return; } } @@ -197,7 +218,7 @@ elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/) { Meteor::Document->serveFileToClient($1,$self); - $self->close(1, 'responseComplete'); + $self->SUPER::close(); return; } @@ -268,21 +289,15 @@ $::Statistics->{'messages_served'}+=$numMessages; - my $msgCount=$self->{'MessageCount'}; + my $msgCount=$self->{'messageCount'}; $msgCount+=$numMessages; - $self->{'MessageCount'}=$msgCount; - - my $maxMsg=$self->getConf('MaxMessages'); - if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg) - { - $self->close(1, 'maxMessageCountReached'); - } + $self->{'messageCount'}=$msgCount; - if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'}) - { - $self->close(1, 'maxMessageCountReached'); + # If long polling, close connection, as a message has now been sent. + # Don't close if still processing the header (we may be sending a backlog from multiple channels) + if($self->getConf('CloseOnEvent') && !exists($self->{'headerBuffer'})) { + $self->close(1, 'closedOnEvent'); } - } sub ping { @@ -336,7 +351,13 @@ $self->write($msg); } } - + + my $fmsg=$self->getConf('FooterTemplate'); + if(defined($fmsg) && $fmsg ne '') + { + $self->write($fmsg); + } + $self->SUPER::close(); } @@ -349,17 +370,15 @@ my $self=shift; my $time=shift; - $self->close(1,'maxTime') if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time); + $self->close(1,'maxTime') if(exists($self->{'connectionTimeLimit'}) && $self->{'connectionTimeLimit'}<$time); } sub getConf { my $self=shift; my $key=shift; - if(exists($self->{'mode'}) && $self->{'mode'} ne '') - { - my $k=$key.$self->{'mode'}; - + if(exists($self->{'mode'}) && $self->{'mode'} ne '') { + my $k=$key.$self->{'mode'}; if(exists($::CONF{$k})) { return $::CONF{$k}; }