/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 56 by andrew.betts, Thu Feb 28 13:33:06 2008 UTC revision 62 by andrew.betts, Thu Nov 27 00:33:21 2008 UTC
# Line 55  sub newFromServer { Line 55  sub newFromServer {
55          my $self=$class->SUPER::newFromServer(shift);          my $self=$class->SUPER::newFromServer(shift);
56                    
57          $self->{'headerBuffer'}='';          $self->{'headerBuffer'}='';
58          $self->{'MessageCount'}=0;          $self->{'messageCount'}=0;
         $self->{'MaxMessageCount'}=0;  
59                    
         $self->{'ConnectionStart'}=time;  
60          my $maxTime=$::CONF{'MaxTime'};          my $maxTime=$::CONF{'MaxTime'};
61          if($maxTime>0)          if($maxTime>0)
62          {          {
63                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;                  $self->{'connectionTimeLimit'}=$self->{'connectionStart'}+$maxTime;
64          }          }
65                    
66          $::Statistics->{'current_subscribers'}++;          $::Statistics->{'current_subscribers'}++;
# Line 84  sub deleteSubscriberWithID { Line 82  sub deleteSubscriberWithID {
82          }          }
83  }  }
84    
85    sub subscriberExists {
86            my $class=shift;
87            my $id=shift;
88    
89            return exists($PersistentConnections{$id});
90    }
91    
92  sub pingPersistentConnections {  sub pingPersistentConnections {
93          my $class=shift;          my $class=shift;
94                    
# Line 106  sub numSubscribers { Line 111  sub numSubscribers {
111          return scalar(keys %PersistentConnections);          return scalar(keys %PersistentConnections);
112  }  }
113    
114    sub listSubscribers {
115            my $class=shift;
116            my $list='';
117            foreach my $subscriber (keys %PersistentConnections)
118            {
119                    my $sub = $PersistentConnections{$subscriber};
120                    $list .= $subscriber.' '.$sub->{'ip'}.' '.$sub->{'connectionStart'}.' '.$sub->{'connectionTimeLimit'}.' '.($sub->{'connectionTimeLimit'}-time).' '.$sub->{'messageCount'}.' "'.$sub->{'useragent'}."\"$::CRLF";
121            }
122            $list;
123    }
124    
125  ###############################################################################  ###############################################################################
126  # Instance methods  # Instance methods
127  ###############################################################################  ###############################################################################
# Line 113  sub processLine { Line 129  sub processLine {
129          my $self=shift;          my $self=shift;
130          my $line=shift;          my $line=shift;
131                    
132          # Once the header was processed we ignore any input          # Once the header was processed we ignore any input - Meteor does not accept or process request bodies
133          return unless(exists($self->{'headerBuffer'}));          return unless(exists($self->{'headerBuffer'}));
134                    
135          if($line ne '')          if($line ne '')
# Line 140  sub processLine { Line 156  sub processLine {
156                          $self->{'mode'}=$2;                          $self->{'mode'}=$2;
157                          my $persist=$self->getConf('Persist');                          my $persist=$self->getConf('Persist');
158                          my $maxTime=$self->getConf('MaxTime');                          my $maxTime=$self->getConf('MaxTime');
159                          $self->{'ConnectionTimeLimit'} = ($self->{'ConnectionStart'}+$maxTime) if ($maxTime>0);                          $self->{'connectionTimeLimit'} = ($self->{'connectionStart'}+$maxTime) if ($maxTime>0);
160                                                    
161                          my @channelData=split('/',$3);                          my @channelData=split('/',$3);
162                          my $channels={};                          my $channels={};
# Line 158  sub processLine { Line 174  sub processLine {
174                                          }                                          }
175                                  }                                  }
176                          }                          }
177                          my $useragent = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";                          $self->{'useragent'} = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
                           
                         delete($self->{'headerBuffer'});  
178                                                    
179                          if ($persist) {                          if ($persist) {
180                                    # New persistent connection: kill any existing connection with same ID
181                                  $self->deleteSubscriberWithID($self->{'subscriberID'});                                  $self->deleteSubscriberWithID($self->{'subscriberID'});
182                                    # Add new persistent connection to collection
183                                  $PersistentConnections{$self->{'subscriberID'}}=$self;                                  $PersistentConnections{$self->{'subscriberID'}}=$self;
184                            } else {
185                                    $::Pollers->{$self->{'subscriberID'}} = time;
186                          }                          }
187                                                    
188                          if(scalar(keys %{$channels})) {                          if(scalar(keys %{$channels})) {
# Line 174  sub processLine { Line 192  sub processLine {
192                                  foreach $channelName (keys %{$channels}) {                                  foreach $channelName (keys %{$channels}) {
193                                          my $channel=Meteor::Channel->channelWithName($channelName);                                          my $channel=Meteor::Channel->channelWithName($channelName);
194                                          $self->{'channels'}->{$channelName}=$channel;                                          $self->{'channels'}->{$channelName}=$channel;
195                                          $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);                                          if (defined($self->{'channels'}->{$channelName}->{'startIndex'}) && $self->{'channels'}->{$channelName}->{'startIndex'} > 0) {
196                                                                                            $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
197                                            }
198                                  }                                  }
199                                  $self->emitOKHeader();                                  $self->emitOKHeader();
200                                  foreach $channelName (keys %{$channels}) {                                  foreach $channelName (keys %{$channels}) {
201                                          my $startIndex=$channels->{$channelName}->{'startIndex'};                                          my $startIndex=$channels->{$channelName}->{'startIndex'};
202                                          $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$useragent);                                          $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$self->{'useragent'});
203                                  }                                  }
204                                  delete ($self->{'channels'}) unless($persist);                                  delete ($self->{'channels'}) unless($persist);
205                                  $self->close(1, 'responseComplete') unless($persist);                                  $self->close(1, 'responseComplete') unless($persist);
206                                    $self->close(1, 'closedOnEvent') unless($self->{'messageCount'} == 0);
207                                    delete($self->{'headerBuffer'});
208                                  return;                                  return;
209                          }                          }
210                  }                  }
# Line 268  sub sendMessages { Line 289  sub sendMessages {
289                    
290          $::Statistics->{'messages_served'}+=$numMessages;          $::Statistics->{'messages_served'}+=$numMessages;
291                    
292          my $msgCount=$self->{'MessageCount'};          my $msgCount=$self->{'messageCount'};
293          $msgCount+=$numMessages;          $msgCount+=$numMessages;
294          $self->{'MessageCount'}=$msgCount;          $self->{'messageCount'}=$msgCount;
           
         my $maxMsg=$self->getConf('MaxMessages');  
         if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)  
         {  
                 $self->close(1, 'maxMessageCountReached');  
         }  
295                    
296          if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})          # If long polling, close connection, as a message has now been sent.
297          {          # Don't close if still processing the header (we may be sending a backlog from multiple channels)
298                  $self->close(1, 'maxMessageCountReached');          if($self->getConf('CloseOnEvent') && !exists($self->{'headerBuffer'})) {
299                    $self->close(1, 'closedOnEvent');
300          }          }
           
301  }  }
302    
303  sub ping {  sub ping {
# Line 336  sub close { Line 351  sub close {
351                          $self->write($msg);                          $self->write($msg);
352                  }                  }
353          }          }
354            
355            my $fmsg=$self->getConf('FooterTemplate');
356            if(defined($fmsg) && $fmsg ne '')
357            {
358                    $self->write($fmsg);
359            }
360    
361          $self->SUPER::close();          $self->SUPER::close();
362  }  }
363    
# Line 349  sub checkForMaxTime { Line 370  sub checkForMaxTime {
370          my $self=shift;          my $self=shift;
371          my $time=shift;          my $time=shift;
372                    
373          $self->close(1,'maxTime') if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);          $self->close(1,'maxTime') if(exists($self->{'connectionTimeLimit'}) && $self->{'connectionTimeLimit'}<$time);
374  }  }
375    
376  sub getConf {  sub getConf {

Legend:
Removed from v.56  
changed lines
  Added in v.62

  ViewVC Help
Powered by ViewVC 1.1.26