/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 35 by andrew.betts, Fri Jan 25 17:12:02 2008 UTC revision 62 by andrew.betts, Thu Nov 27 00:33:21 2008 UTC
# Line 44  package Meteor::Subscriber; Line 44  package Meteor::Subscriber;
44                    
45          our %PersistentConnections=();          our %PersistentConnections=();
46          our $NumAcceptedConnections=0;          our $NumAcceptedConnections=0;
47        
48    
49  ###############################################################################  ###############################################################################
50  # Factory methods  # Factory methods
# Line 54  sub newFromServer { Line 55  sub newFromServer {
55          my $self=$class->SUPER::newFromServer(shift);          my $self=$class->SUPER::newFromServer(shift);
56                    
57          $self->{'headerBuffer'}='';          $self->{'headerBuffer'}='';
58          $self->{'MessageCount'}=0;          $self->{'messageCount'}=0;
         $self->{'MaxMessageCount'}=0;  
59                    
         $self->{'ConnectionStart'}=time;  
60          my $maxTime=$::CONF{'MaxTime'};          my $maxTime=$::CONF{'MaxTime'};
61          if($maxTime>0)          if($maxTime>0)
62          {          {
63                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;                  $self->{'connectionTimeLimit'}=$self->{'connectionStart'}+$maxTime;
64          }          }
65                    
66          $::Statistics->{'current_subscribers'}++;          $::Statistics->{'current_subscribers'}++;
# Line 79  sub deleteSubscriberWithID { Line 78  sub deleteSubscriberWithID {
78                    
79          if(exists($PersistentConnections{$id}))          if(exists($PersistentConnections{$id}))
80          {          {
81                  $PersistentConnections{$id}->close(1);                  $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
82          }          }
83  }  }
84    
85    sub subscriberExists {
86            my $class=shift;
87            my $id=shift;
88    
89            return exists($PersistentConnections{$id});
90    }
91    
92  sub pingPersistentConnections {  sub pingPersistentConnections {
93          my $class=shift;          my $class=shift;
94                    
         my $msg=$::CONF{'PingMessage'};  
95          my @cons=values %PersistentConnections;          my @cons=values %PersistentConnections;
96                    
97          map { $_->write($msg.chr(0)) } @cons;          map { $_->ping() } @cons;
98  }  }
99    
100  sub checkPersistentConnectionsForMaxTime {  sub checkPersistentConnectionsForMaxTime {
# Line 106  sub numSubscribers { Line 111  sub numSubscribers {
111          return scalar(keys %PersistentConnections);          return scalar(keys %PersistentConnections);
112  }  }
113    
114    sub listSubscribers {
115            my $class=shift;
116            my $list='';
117            foreach my $subscriber (keys %PersistentConnections)
118            {
119                    my $sub = $PersistentConnections{$subscriber};
120                    $list .= $subscriber.' '.$sub->{'ip'}.' '.$sub->{'connectionStart'}.' '.$sub->{'connectionTimeLimit'}.' '.($sub->{'connectionTimeLimit'}-time).' '.$sub->{'messageCount'}.' "'.$sub->{'useragent'}."\"$::CRLF";
121            }
122            $list;
123    }
124    
125  ###############################################################################  ###############################################################################
126  # Instance methods  # Instance methods
127  ###############################################################################  ###############################################################################
# Line 113  sub processLine { Line 129  sub processLine {
129          my $self=shift;          my $self=shift;
130          my $line=shift;          my $line=shift;
131                    
132          # Once the header was processed we ignore any input          # Once the header was processed we ignore any input - Meteor does not accept or process request bodies
133          return unless(exists($self->{'headerBuffer'}));          return unless(exists($self->{'headerBuffer'}));
134                    
135          if($line ne '')          if($line ne '')
# Line 134  sub processLine { Line 150  sub processLine {
150                  #                  #
151                  # Find the 'GET' line                  # Find the 'GET' line
152                  #                  #
153                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i)
154                  {                  {
155                          my $subscriberID=$1;                          $self->{'subscriberID'}=$1;
                         my $persist=0;  
156                          $self->{'mode'}=$2;                          $self->{'mode'}=$2;
157                          if ($self->{'mode'} eq "xhrinteractive" || $self->{'mode'} eq "iframe" || $self->{'mode'} eq "serversent" || $self->{'mode'} eq "longpoll") {                          my $persist=$self->getConf('Persist');
158                                  $persist=1;                          my $maxTime=$self->getConf('MaxTime');
159                                  $self->{'MaxMessageCount'}=1 unless(!($self->{'mode'} eq "longpoll"));                          $self->{'connectionTimeLimit'} = ($self->{'connectionStart'}+$maxTime) if ($maxTime>0);
160                          }                          
                         if ($self->{'mode'} eq "iframe") {  
                                 $self->{'HeaderTemplateNumber'}=1;  
                         } else {  
                                 $self->{'HeaderTemplateNumber'}=2;  
                         }  
161                          my @channelData=split('/',$3);                          my @channelData=split('/',$3);
162                          my $channels={};                          my $channels={};
163                          my $channelName;                          my $channelName;
164                          my $offset;                          my $offset;
165                          foreach my $chandef (@channelData) {                          foreach my $chandef (@channelData) {
166                                  if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {                                  if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
167                                          $channelName = $1;                                          $channelName = $1;
168                                          $channels->{$channelName}->{'startIndex'} = undef;                                          $channels->{$channelName}->{'startIndex'} = undef;
169                                          if ($3) {                                          if ($3) {
# Line 164  sub processLine { Line 174  sub processLine {
174                                          }                                          }
175                                  }                                  }
176                          }                          }
177                            $self->{'useragent'} = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
178                                                    
179                          delete($self->{'headerBuffer'});                          if ($persist) {
180                                                            # New persistent connection: kill any existing connection with same ID
181                          if($persist)                                  $self->deleteSubscriberWithID($self->{'subscriberID'});
182                          {                                  # Add new persistent connection to collection
183                                  $self->{'subscriberID'}=$subscriberID;                                  $PersistentConnections{$self->{'subscriberID'}}=$self;
184                                  $self->deleteSubscriberWithID($subscriberID);                          } else {
185                                  $PersistentConnections{$subscriberID}=$self;                                  $::Pollers->{$self->{'subscriberID'}} = time;
186                          }                          }
187                                                    
188                          if(scalar(keys %{$channels}))                          if(scalar(keys %{$channels})) {
189                          {  
190                                    $self->{'channelinfo'} = '';
191                                    my $citemplate = $self->getConf('ChannelInfoTemplate');
192                                    foreach $channelName (keys %{$channels}) {
193                                            my $channel=Meteor::Channel->channelWithName($channelName);
194                                            $self->{'channels'}->{$channelName}=$channel;
195                                            if (defined($self->{'channels'}->{$channelName}->{'startIndex'}) && $self->{'channels'}->{$channelName}->{'startIndex'} > 0) {
196                                                    $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
197                                            }
198                                    }
199                                  $self->emitOKHeader();                                  $self->emitOKHeader();
200                                  $self->setChannels($channels,$persist);                                  foreach $channelName (keys %{$channels}) {
201                                  $self->close(1) unless($persist);                                          my $startIndex=$channels->{$channelName}->{'startIndex'};
202                                            $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$self->{'useragent'});
203                                    }
204                                    delete ($self->{'channels'}) unless($persist);
205                                    $self->close(1, 'responseComplete') unless($persist);
206                                    $self->close(1, 'closedOnEvent') unless($self->{'messageCount'} == 0);
207                                    delete($self->{'headerBuffer'});
208                                  return;                                  return;
209                          }                          }
210                  }                  }
# Line 186  sub processLine { Line 212  sub processLine {
212                  {                  {
213                          $self->deleteSubscriberWithID($1);                          $self->deleteSubscriberWithID($1);
214                          $self->emitOKHeader();                          $self->emitOKHeader();
215                          $self->close(1);                          $self->close(1, 'disconnectRequested');
216                          return;                          return;
217                  }                  }
218                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
219                  {                  {
220                          Meteor::Document->serveFileToClient($1,$self);                          Meteor::Document->serveFileToClient($1,$self);
221                          $self->close(1);                          $self->close(1, 'responseComplete');
222                          return;                          return;
223                  }                  }
224                                    
# Line 203  sub processLine { Line 229  sub processLine {
229          }          }
230  }  }
231    
 sub setChannels {  
         my $self=shift;  
         my $channels=shift;  
         my $persist=shift;  
           
         foreach my $channelName (keys %{$channels})  
         {  
                 my $startIndex=$channels->{$channelName}->{'startIndex'};  
                   
                 my $channel=Meteor::Channel->channelWithName($channelName);  
                   
                 $self->{'channels'}->{$channelName}=$channel if($persist);  
                   
                 $channel->addSubscriber($self,$startIndex,$persist);  
         }  
 }  
   
232  sub emitOKHeader {  sub emitOKHeader {
233          my $self=shift;          my $self=shift;
234                    
# Line 233  sub emitErrorHeader { Line 242  sub emitErrorHeader {
242          $::Statistics->{'errors_served'}++;          $::Statistics->{'errors_served'}++;
243                    
244          # close up shop here!          # close up shop here!
245          $self->close();          $self->close(0, 'error');
246  }  }
247    
248  sub emitHeader {  sub emitHeader {
249          my $self=shift;          my $self=shift;
250          my $status=shift;          my $status=shift;
251                    
252          my $header=undef;          my $header=$self->getConf('HeaderTemplate');
         if(exists($self->{'HeaderTemplateNumber'}))  
         {  
                 my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};  
                   
                 $header=$::CONF{$hn};  
         }  
         $header=$::CONF{'HeaderTemplate'} unless(defined($header));  
253                    
254          $header=~s/~([^~]+)~/          $header=~s/~([^~]*)~/
255                  if(!defined($1) || $1 eq '')                  if(!defined($1) || $1 eq '') {
                 {  
256                          '~';                          '~';
257                  }                  } elsif($1 eq 'server') {
                 elsif($1 eq 'server')  
                 {  
258                          $::PGM;                          $::PGM;
259                  }                  } elsif($1 eq 'status') {
                 elsif($1 eq 'status')  
                 {  
260                          $status;                          $status;
261                  }                  } elsif($1 eq 'servertime') {
                 elsif($1 eq 'servertime')  
                 {  
262                          time;                          time;
263                  }                  } elsif($1 eq 'channelinfo') {
264                  else                          $self->{'channelinfo'};
265                  {                  } else {
266                          '';                          '';
267                  }                  }
268          /gex;          /gex;
269                    
270          $self->write($header.chr(0));          $self->write($header);
271  }  }
272    
273  sub sendMessage {  sub sendMessages {
274          my $self=shift;          my $self=shift;
         my $msg=shift;  
         my $numMsgInThisBatch=shift;  
275                    
276          $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch));          my $numMessages=0;
277            my $msgTemplate=$self->getConf('MessageTemplate');
278            my $msgData='';
279                    
280          $self->write($msg.chr(0));          foreach my $message (@_)
281            {
282                    $msgData.=$message->messageWithTemplate($msgTemplate);
283                    $numMessages++;
284            }
285                    
286          $::Statistics->{'messages_served'}+=$numMsgInThisBatch;          return if($numMessages<1);
287                    
288          my $msgCount=++$self->{'MessageCount'};          $self->write($msgData);
289                    
290          my $maxMsg=$::CONF{'MaxMessages'};          $::Statistics->{'messages_served'}+=$numMessages;
         if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)  
         {  
                 $self->close(1);  
         }  
291                    
292          if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})          my $msgCount=$self->{'messageCount'};
293          {          $msgCount+=$numMessages;
294                  $self->close(1);          $self->{'messageCount'}=$msgCount;
295            
296            # If long polling, close connection, as a message has now been sent.
297            # Don't close if still processing the header (we may be sending a backlog from multiple channels)
298            if($self->getConf('CloseOnEvent') && !exists($self->{'headerBuffer'})) {
299                    $self->close(1, 'closedOnEvent');
300          }          }
301  }  }
302    
303    sub ping {
304            my $self=shift;
305            my $msg=$self->getConf('PingMessage');
306            
307            $self->write($msg);
308    }
309    
310  sub closeChannel {  sub closeChannel {
311          my $self=shift;          my $self=shift;
312          my $channelName=shift;          my $channelName=shift;
# Line 307  sub closeChannel { Line 314  sub closeChannel {
314          return unless(exists($self->{'channels'}->{$channelName}));          return unless(exists($self->{'channels'}->{$channelName}));
315                    
316          my $channel=$self->{'channels'}->{$channelName};          my $channel=$self->{'channels'}->{$channelName};
317          $channel->removeSubscriber($self);          $channel->removeSubscriber($self,'channelClose');
318                    
319          delete($self->{'channels'}->{$channelName});          delete($self->{'channels'}->{$channelName});
320                    
321          $self->close() if(scalar(keys %{$self->{'channels'}})==0);          $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
322  }  }
323    
324  sub close {  sub close {
325          my $self=shift;          my $self=shift;
326          my $noShutdownMsg=shift;          my $noShutdownMsg=shift;
327            my $reason=shift;
328                    
329          foreach my $channelName (keys %{$self->{'channels'}})          foreach my $channelName (keys %{$self->{'channels'}})
330          {          {
331                  my $channel=$self->{'channels'}->{$channelName};                  my $channel=$self->{'channels'}->{$channelName};
332                  $channel->removeSubscriber($self);                  $channel->removeSubscriber($self,$reason);
333          }          }
334          delete($self->{'channels'});          delete($self->{'channels'});
335                    
336          if(exists($self->{'subscriberID'}))          # If this connection is in the PersistentConnections array, delete it, then anonymise
337          {          # it so that if we have to wait for the write buffer to empty before close, it's only
338            # removed once.
339            if(exists($self->{'subscriberID'})) {
340                  delete($PersistentConnections{$self->{'subscriberID'}});                  delete($PersistentConnections{$self->{'subscriberID'}});
341                    delete($self->{'subscriberID'});
342          }          }
343                    
         #  
344          # Send shutdown message unless remote closed or          # Send shutdown message unless remote closed or
345          # connection not yet established          # connection not yet established
         #  
346          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
347          {          {
348                  my $msg=$::CONF{'SubscriberShutdownMsg'};                  my $msg=$self->getConf('SubscriberShutdownMsg');
349                  if(defined($msg) && $msg ne '')                  if(defined($msg) && $msg ne '')
350                  {                  {
351                          $self->write($msg);                          $self->write($msg);
352                  }                  }
353          }          }
354    
355            my $fmsg=$self->getConf('FooterTemplate');
356            if(defined($fmsg) && $fmsg ne '')
357            {
358                    $self->write($fmsg);
359            }
360    
361            $self->SUPER::close();
362    }
363    
364    sub didClose {
365                    
366          $::Statistics->{'current_subscribers'}--;          $::Statistics->{'current_subscribers'}--;
           
         $self->SUPER::close();  
367  }  }
368    
369  sub checkForMaxTime {  sub checkForMaxTime {
370          my $self=shift;          my $self=shift;
371          my $time=shift;          my $time=shift;
372                    
373          $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);          $self->close(1,'maxTime') if(exists($self->{'connectionTimeLimit'}) && $self->{'connectionTimeLimit'}<$time);
374    }
375    
376    sub getConf {
377            my $self=shift;
378            my $key=shift;
379            
380            if(exists($self->{'mode'}) && $self->{'mode'} ne '')
381            {
382                    my $k=$key.$self->{'mode'};
383                    
384                    if(exists($::CONF{$k})) {
385                            return $::CONF{$k};
386                    }
387            }
388            
389            $::CONF{$key};
390  }  }
391    
392  1;  1;

Legend:
Removed from v.35  
changed lines
  Added in v.62

  ViewVC Help
Powered by ViewVC 1.1.26