--- googlecode.com/svn/trunk/Meteor/Subscriber.pm 2006/11/20 17:59:30 3 +++ googlecode.com/svn/trunk/Meteor/Subscriber.pm 2007/05/20 19:40:53 25 @@ -43,6 +43,7 @@ @Meteor::Subscriber::ISA=qw(Meteor::Connection); our %PersistentConnections=(); + our $NumAcceptedConnections=0; ############################################################################### # Factory methods @@ -63,6 +64,9 @@ $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime; } + $::Statistics->{'current_subscribers'}++; + $::Statistics->{'subscriber_connections_accepted'}++; + $self; } @@ -97,6 +101,11 @@ map { $_->checkForMaxTime($time) } @cons; } +sub numSubscribers { + + return scalar(keys %PersistentConnections); +} + ############################################################################### # Instance methods ############################################################################### @@ -133,10 +142,27 @@ my $backtrack=undef; my $persist=1; my $subscriberID=undef; + my $channels={}; foreach my $formElement (@formData) { if($formElement=~/^channel=(.+)$/) { + if(defined($channelName)) + { + if(defined($startIndex) && defined($backtrack)) + { + $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'"); + $self->close(); + + return; + } + + $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack)); + $channels->{$channelName}->{'startIndex'}=$startIndex; + + $startIndex=undef; + $backtrack=undef; + } $channelName=$1; } elsif($formElement=~/^restartfrom=(\d*)$/) @@ -176,17 +202,23 @@ } } } - - delete($self->{'headerBuffer'}); - if(defined($startIndex) && defined($backtrack)) + if(defined($channelName)) { - $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'"); - $self->close(); + if(defined($startIndex) && defined($backtrack)) + { + $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'"); + $self->close(); + + return; + } - return; + $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack)); + $channels->{$channelName}->{'startIndex'}=$startIndex; } + delete($self->{'headerBuffer'}); + if(defined($subscriberID) && $persist) { $self->{'subscriberID'}=$subscriberID; @@ -194,13 +226,11 @@ $PersistentConnections{$subscriberID}=$self; } - if(defined($channelName)) + if(scalar(keys %{$channels})) { $self->emitOKHeader(); - $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack)); - - $self->setChannelName($channelName,$startIndex,$persist); + $self->setChannels($channels,$persist); $self->close(1) unless($persist); @@ -223,16 +253,21 @@ } } -sub setChannelName { +sub setChannels { my $self=shift; - my $channelName=shift; - my $startIndex=shift; + my $channels=shift; my $persist=shift; - my $channel=Meteor::Channel->channelWithName($channelName); - $self->{'channel'}=$channel if($persist); - - $channel->addSubscriber($self,$startIndex,$persist); + foreach my $channelName (keys %{$channels}) + { + my $startIndex=$channels->{$channelName}->{'startIndex'}; + + my $channel=Meteor::Channel->channelWithName($channelName); + + $self->{'channels'}->{$channelName}=$channel if($persist); + + $channel->addSubscriber($self,$startIndex,$persist); + } } sub emitOKHeader { @@ -245,6 +280,7 @@ my $self=shift; $self->emitHeader('404 Not Found'); + $::Statistics->{'errors_served'}++; # close up shop here! $self->close(); @@ -292,9 +328,14 @@ sub sendMessage { my $self=shift; my $msg=shift; + my $numMsgInThisBatch=shift; + + $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch)); $self->write($msg); + $::Statistics->{'messages_served'}+=$numMsgInThisBatch; + my $msgCount=++$self->{'MessageCount'}; my $maxMsg=$::CONF{'MaxMessages'}; @@ -309,12 +350,30 @@ } } +sub closeChannel { + my $self=shift; + my $channelName=shift; + + return unless(exists($self->{'channels'}->{$channelName})); + + my $channel=$self->{'channels'}->{$channelName}; + $channel->removeSubscriber($self); + + delete($self->{'channels'}->{$channelName}); + + $self->close() if(scalar(keys %{$self->{'channels'}})==0); +} + sub close { my $self=shift; my $noShutdownMsg=shift; - $self->{'channel'}->removeSubscriber($self) if($self->{'channel'}); - delete($self->{'channel'}); + foreach my $channelName (keys %{$self->{'channels'}}) + { + my $channel=$self->{'channels'}->{$channelName}; + $channel->removeSubscriber($self); + } + delete($self->{'channels'}); if(exists($self->{'subscriberID'})) { @@ -334,6 +393,8 @@ } } + $::Statistics->{'current_subscribers'}--; + $self->SUPER::close(); }