/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 50 by andrew.betts, Wed Feb 27 13:55:35 2008 UTC revision 64 by andrew.betts, Mon Jan 19 11:19:41 2009 UTC
# Line 55  sub newFromServer { Line 55  sub newFromServer {
55          my $self=$class->SUPER::newFromServer(shift);          my $self=$class->SUPER::newFromServer(shift);
56                    
57          $self->{'headerBuffer'}='';          $self->{'headerBuffer'}='';
58          $self->{'MessageCount'}=0;          $self->{'messageCount'}=0;
         $self->{'MaxMessageCount'}=0;  
           
         $self->{'ConnectionStart'}=time;  
         my $maxTime=$::CONF{'MaxTime'};  
         if($maxTime>0)  
         {  
                 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;  
         }  
59                    
60          $::Statistics->{'current_subscribers'}++;          $::Statistics->{'current_subscribers'}++;
61          $::Statistics->{'subscriber_connections_accepted'}++;          $::Statistics->{'subscriber_connections_accepted'}++;
# Line 84  sub deleteSubscriberWithID { Line 76  sub deleteSubscriberWithID {
76          }          }
77  }  }
78    
79    sub subscriberExists {
80            my $class=shift;
81            my $id=shift;
82    
83            return exists($PersistentConnections{$id});
84    }
85    
86  sub pingPersistentConnections {  sub pingPersistentConnections {
87          my $class=shift;          my $class=shift;
88                    
# Line 106  sub numSubscribers { Line 105  sub numSubscribers {
105          return scalar(keys %PersistentConnections);          return scalar(keys %PersistentConnections);
106  }  }
107    
108    sub listSubscribers {
109            my $class=shift;
110            my $list='';
111            foreach my $subscriber (keys %PersistentConnections)
112            {
113                    my $sub = $PersistentConnections{$subscriber};
114                    $list .= $subscriber.' '.$sub->{'ip'}.' '.$sub->{'connectionStart'}.' '.$sub->{'connectionTimeLimit'}.' '.($sub->{'connectionTimeLimit'}-time).' '.$sub->{'messageCount'}.' "'.$sub->{'useragent'}."\"$::CRLF";
115            }
116            $list;
117    }
118    
119  ###############################################################################  ###############################################################################
120  # Instance methods  # Instance methods
121  ###############################################################################  ###############################################################################
# Line 113  sub processLine { Line 123  sub processLine {
123          my $self=shift;          my $self=shift;
124          my $line=shift;          my $line=shift;
125                    
126          # Once the header was processed we ignore any input          # Once the header was processed we ignore any input - Meteor does not accept or process request bodies
127          return unless(exists($self->{'headerBuffer'}));          return unless(exists($self->{'headerBuffer'}));
128                    
129          if($line ne '')          if($line ne '')
# Line 134  sub processLine { Line 144  sub processLine {
144                  #                  #
145                  # Find the 'GET' line                  # Find the 'GET' line
146                  #                  #
147                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i)
148                  {                  {
149                          $self->{'subscriberID'}=$1;                          $self->{'subscriberID'}=$1;
150                          $self->{'mode'}=$2;                          $self->{'mode'}=$2;
151                          my $persist=$self->getConf('Persist');                          my $persist=$self->getConf('Persist');
152                          my $maxTime=$self->getConf('MaxTime');                          my $maxTime=$self->getConf('MaxTime');
153                          $self->{'ConnectionTimeLimit'} = ($self->{'ConnectionStart'}+$maxTime) if ($maxTime>0);                          $self->{'connectionTimeLimit'} = ($self->{'connectionStart'}+$maxTime) if ($maxTime>0);
154                                                    
155                          my @channelData=split('/',$3);                          my @channelData=split('/',$3);
156                          my $channels={};                          my $channels={};
157                          my $channelName;                          my $channelName;
158                          my $offset;                          my $offset;
159                          foreach my $chandef (@channelData) {                          foreach my $chandef (@channelData) {
160                                  if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {                                  if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
161                                          $channelName = $1;                                          $channelName = $1;
162                                          $channels->{$channelName}->{'startIndex'} = undef;                                          $channels->{$channelName}->{'startIndex'} = undef;
163                                          if ($3) {                                          if ($3) {
# Line 158  sub processLine { Line 168  sub processLine {
168                                          }                                          }
169                                  }                                  }
170                          }                          }
171                          my $useragent = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";                          $self->{'useragent'} = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
                           
                         delete($self->{'headerBuffer'});  
172                                                    
173                          if ($persist) {                          if ($persist) {
174                                    # New persistent connection: kill any existing connection with same ID
175                                  $self->deleteSubscriberWithID($self->{'subscriberID'});                                  $self->deleteSubscriberWithID($self->{'subscriberID'});
176                                    # Add new persistent connection to collection
177                                  $PersistentConnections{$self->{'subscriberID'}}=$self;                                  $PersistentConnections{$self->{'subscriberID'}}=$self;
178                            } else {
179                                    $::Pollers->{$self->{'subscriberID'}} = time;
180                          }                          }
181                                                    
182                          if(scalar(keys %{$channels}))                          if(scalar(keys %{$channels})) {
183                          {  
184                                    $self->{'channelinfo'} = '';
185                                    my $citemplate = $self->getConf('ChannelInfoTemplate');
186                                    foreach $channelName (keys %{$channels}) {
187                                            my $channel=Meteor::Channel->channelWithName($channelName);
188                                            $self->{'channels'}->{$channelName}=$channel;
189                                            if (defined($self->{'channels'}->{$channelName}->{'startIndex'}) && $self->{'channels'}->{$channelName}->{'startIndex'} > 0) {
190                                                    $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
191                                            }
192                                    }
193                                  $self->emitOKHeader();                                  $self->emitOKHeader();
194                                  $self->setChannels($channels,$persist,$self->{'mode'},$useragent);                                  foreach $channelName (keys %{$channels}) {
195                                  $self->close(1, 'responseComplete') unless($persist);                                          my $startIndex=$channels->{$channelName}->{'startIndex'};
196                                            $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$self->{'useragent'});
197                                    }
198                                    if (!$persist) {
199                                            delete ($self->{'channels'});
200                                            $self->close(1, 'responseComplete');
201                                    }
202                                    delete($self->{'headerBuffer'});
203    
204                                    # If long polling, close connection immediately if any messages have been sent
205                                    if ($self->{'messageCount'} > 0 && $self->{'mode'} eq 'longpoll') {
206                                            $self->close(1, 'closedOnEvent');
207                                    }
208                                  return;                                  return;
209                          }                          }
210                  }                  }
# Line 185  sub processLine { Line 218  sub processLine {
218                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
219                  {                  {
220                          Meteor::Document->serveFileToClient($1,$self);                          Meteor::Document->serveFileToClient($1,$self);
221                          $self->close(1, 'responseComplete');                          $self->SUPER::close();
222                          return;                          return;
223                  }                  }
224                                    
# Line 196  sub processLine { Line 229  sub processLine {
229          }          }
230  }  }
231    
 sub setChannels {  
         my $self=shift;  
         my $channels=shift;  
         my $persist=shift;  
         my $mode=shift || '';  
         my $userAgent=shift || '';  
           
         foreach my $channelName (keys %{$channels})  
         {  
                 my $startIndex=$channels->{$channelName}->{'startIndex'};  
                   
                 my $channel=Meteor::Channel->channelWithName($channelName);  
                   
                 $self->{'channels'}->{$channelName}=$channel if($persist);  
                   
                 $channel->addSubscriber($self,$startIndex,$persist,$mode,$userAgent);  
         }  
 }  
   
232  sub emitOKHeader {  sub emitOKHeader {
233          my $self=shift;          my $self=shift;
234                    
# Line 238  sub emitHeader { Line 252  sub emitHeader {
252          my $header=$self->getConf('HeaderTemplate');          my $header=$self->getConf('HeaderTemplate');
253                    
254          $header=~s/~([^~]*)~/          $header=~s/~([^~]*)~/
255                  if(!defined($1) || $1 eq '')                  if(!defined($1) || $1 eq '') {
                 {  
256                          '~';                          '~';
257                  }                  } elsif($1 eq 'server') {
                 elsif($1 eq 'server')  
                 {  
258                          $::PGM;                          $::PGM;
259                  }                  } elsif($1 eq 'status') {
                 elsif($1 eq 'status')  
                 {  
260                          $status;                          $status;
261                  }                  } elsif($1 eq 'servertime') {
                 elsif($1 eq 'servertime')  
                 {  
262                          time;                          time;
263                  }                  } elsif($1 eq 'channelinfo') {
264                  elsif($1 eq 'channelinfo')                          $self->{'channelinfo'};
265                  {                  } else {
                         Meteor::Channel->listChannelsUsingTemplate($self->getConf('ChannelInfoTemplate'));  
                 }  
                 else  
                 {  
266                          '';                          '';
267                  }                  }
268          /gex;          /gex;
# Line 286  sub sendMessages { Line 289  sub sendMessages {
289                    
290          $::Statistics->{'messages_served'}+=$numMessages;          $::Statistics->{'messages_served'}+=$numMessages;
291                    
292          my $msgCount=$self->{'MessageCount'};          my $msgCount=$self->{'messageCount'};
293          $msgCount+=$numMessages;          $msgCount+=$numMessages;
294          $self->{'MessageCount'}=$msgCount;          $self->{'messageCount'}=$msgCount;
295                    
296          my $maxMsg=$self->getConf('MaxMessages');          # If long polling, close connection, as a message has now been sent.
297          if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)          # Don't close if still processing the header (we may be sending a backlog from multiple channels)
298          {          if($self->getConf('CloseOnEvent') && !exists($self->{'headerBuffer'})) {
299                  $self->close(1, 'maxMessageCountReached');                  $self->close(1, 'closedOnEvent');
300          }          }
           
         if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})  
         {  
                 $self->close(1, 'maxMessageCountReached');  
         }  
           
301  }  }
302    
303  sub ping {  sub ping {
# Line 354  sub close { Line 351  sub close {
351                          $self->write($msg);                          $self->write($msg);
352                  }                  }
353          }          }
354            
355            my $fmsg=$self->getConf('FooterTemplate');
356            if(defined($fmsg) && $fmsg ne '')
357            {
358                    $self->write($fmsg);
359            }
360    
361          $self->SUPER::close();          $self->SUPER::close();
362  }  }
363    
# Line 367  sub checkForMaxTime { Line 370  sub checkForMaxTime {
370          my $self=shift;          my $self=shift;
371          my $time=shift;          my $time=shift;
372                    
373          $self->close(1,'maxTime') if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);          $self->close(1,'maxTime') if(exists($self->{'connectionTimeLimit'}) && $self->{'connectionTimeLimit'}<$time);
374  }  }
375    
376  sub getConf {  sub getConf {
377          my $self=shift;          my $self=shift;
378          my $key=shift;          my $key=shift;
379                    
380          if(exists($self->{'mode'}) && $self->{'mode'} ne '')          if(exists($self->{'mode'}) && $self->{'mode'} ne '') {
381          {                  my $k=$key.$self->{'mode'};            
                 my $k=$key.$self->{'mode'};  
                   
382                  if(exists($::CONF{$k})) {                  if(exists($::CONF{$k})) {
383                          return $::CONF{$k};                          return $::CONF{$k};
384                  }                  }

Legend:
Removed from v.50  
changed lines
  Added in v.64

  ViewVC Help
Powered by ViewVC 1.1.26