/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 37 by knops.gerd, Fri Feb 1 21:22:03 2008 UTC revision 53 by andrew.betts, Wed Feb 27 21:58:56 2008 UTC
# Line 80  sub deleteSubscriberWithID { Line 80  sub deleteSubscriberWithID {
80                    
81          if(exists($PersistentConnections{$id}))          if(exists($PersistentConnections{$id}))
82          {          {
83                  $PersistentConnections{$id}->close(1);                  $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
84          }          }
85  }  }
86    
87  sub pingPersistentConnections {  sub pingPersistentConnections {
88          my $class=shift;          my $class=shift;
89                    
         my $msg=$::CONF{'PingMessage'};  
90          my @cons=values %PersistentConnections;          my @cons=values %PersistentConnections;
91                    
92          map { $_->write($msg.chr(0)) } @cons;          map { $_->ping() } @cons;
93  }  }
94    
95  sub checkPersistentConnectionsForMaxTime {  sub checkPersistentConnectionsForMaxTime {
# Line 137  sub processLine { Line 136  sub processLine {
136                  #                  #
137                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
138                  {                  {
139                          my $subscriberID=$1;                          $self->{'subscriberID'}=$1;
                         my $persist=0;  
140                          $self->{'mode'}=$2;                          $self->{'mode'}=$2;
141                          if ($self->{'mode'} eq "xhrinteractive" || $self->{'mode'} eq "iframe" || $self->{'mode'} eq "serversent" || $self->{'mode'} eq "longpoll") {                          my $persist=$self->getConf('Persist');
142                                  $persist=1;                          my $maxTime=$self->getConf('MaxTime');
143                                  $self->{'MaxMessageCount'}=1 unless(!($self->{'mode'} eq "longpoll"));                          $self->{'ConnectionTimeLimit'} = ($self->{'ConnectionStart'}+$maxTime) if ($maxTime>0);
144                          }                          
                         if ($self->{'mode'} eq "iframe") {  
                                 $self->{'HeaderTemplateNumber'}=1;  
                         } else {  
                                 $self->{'HeaderTemplateNumber'}=2;  
                         }  
145                          my @channelData=split('/',$3);                          my @channelData=split('/',$3);
146                          my $channels={};                          my $channels={};
147                          my $channelName;                          my $channelName;
148                          my $offset;                          my $offset;
149                          foreach my $chandef (@channelData) {                          foreach my $chandef (@channelData) {
150                                  if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {                                  if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
151                                          $channelName = $1;                                          $channelName = $1;
152                                          $channels->{$channelName}->{'startIndex'} = undef;                                          $channels->{$channelName}->{'startIndex'} = undef;
153                                          if ($3) {                                          if ($3) {
# Line 165  sub processLine { Line 158  sub processLine {
158                                          }                                          }
159                                  }                                  }
160                          }                          }
161                            my $useragent = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
162                                                    
163                          delete($self->{'headerBuffer'});                          delete($self->{'headerBuffer'});
164                                                    
165                          if($persist)                          if ($persist) {
166                          {                                  $self->deleteSubscriberWithID($self->{'subscriberID'});
167                                  $self->{'subscriberID'}=$subscriberID;                                  $PersistentConnections{$self->{'subscriberID'}}=$self;
                                 $self->deleteSubscriberWithID($subscriberID);  
                                 $PersistentConnections{$subscriberID}=$self;  
168                          }                          }
169                                                    
170                          if(scalar(keys %{$channels}))                          if(scalar(keys %{$channels})) {
171                          {  
172                                    $self->{'channelinfo'} = '';
173                                    my $citemplate = $self->getConf('ChannelInfoTemplate');
174                                    foreach $channelName (keys %{$channels}) {
175                                            my $channel=Meteor::Channel->channelWithName($channelName);
176                                            $self->{'channels'}->{$channelName}=$channel;
177                                            $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
178                                            
179                                    }
180                                  $self->emitOKHeader();                                  $self->emitOKHeader();
181                                  $self->setChannels($channels,$persist);                                  foreach $channelName (keys %{$channels}) {
182                                  $self->close(1) unless($persist);                                          my $startIndex=$channels->{$channelName}->{'startIndex'};
183                                            $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$useragent);
184                                    }
185                                    delete ($self->{'channels'}) unless($persist);
186                                    $self->close(1, 'responseComplete') unless($persist);
187                                  return;                                  return;
188                          }                          }
189                  }                  }
# Line 187  sub processLine { Line 191  sub processLine {
191                  {                  {
192                          $self->deleteSubscriberWithID($1);                          $self->deleteSubscriberWithID($1);
193                          $self->emitOKHeader();                          $self->emitOKHeader();
194                          $self->close(1);                          $self->close(1, 'disconnectRequested');
195                          return;                          return;
196                  }                  }
197                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
198                  {                  {
199                          Meteor::Document->serveFileToClient($1,$self);                          Meteor::Document->serveFileToClient($1,$self);
200                          $self->close(1);                          $self->close(1, 'responseComplete');
201                          return;                          return;
202                  }                  }
203                                    
# Line 204  sub processLine { Line 208  sub processLine {
208          }          }
209  }  }
210    
 sub setChannels {  
         my $self=shift;  
         my $channels=shift;  
         my $persist=shift;  
           
         foreach my $channelName (keys %{$channels})  
         {  
                 my $startIndex=$channels->{$channelName}->{'startIndex'};  
                   
                 my $channel=Meteor::Channel->channelWithName($channelName);  
                   
                 $self->{'channels'}->{$channelName}=$channel if($persist);  
                   
                 $channel->addSubscriber($self,$startIndex,$persist);  
         }  
 }  
   
211  sub emitOKHeader {  sub emitOKHeader {
212          my $self=shift;          my $self=shift;
213                    
# Line 234  sub emitErrorHeader { Line 221  sub emitErrorHeader {
221          $::Statistics->{'errors_served'}++;          $::Statistics->{'errors_served'}++;
222                    
223          # close up shop here!          # close up shop here!
224          $self->close();          $self->close(0, 'error');
225  }  }
226    
227  sub emitHeader {  sub emitHeader {
228          my $self=shift;          my $self=shift;
229          my $status=shift;          my $status=shift;
230                    
231          my $header=undef;          my $header=$self->getConf('HeaderTemplate');
         if(exists($self->{'HeaderTemplateNumber'}))  
         {  
                 my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};  
                   
                 $header=$::CONF{$hn};  
         }  
         $header=$::CONF{'HeaderTemplate'} unless(defined($header));  
232                    
233          $header=~s/~([^~]+)~/          $header=~s/~([^~]*)~/
234                  if(!defined($1) || $1 eq '')                  if(!defined($1) || $1 eq '') {
                 {  
235                          '~';                          '~';
236                  }                  } elsif($1 eq 'server') {
                 elsif($1 eq 'server')  
                 {  
237                          $::PGM;                          $::PGM;
238                  }                  } elsif($1 eq 'status') {
                 elsif($1 eq 'status')  
                 {  
239                          $status;                          $status;
240                  }                  } elsif($1 eq 'servertime') {
                 elsif($1 eq 'servertime')  
                 {  
241                          time;                          time;
242                  }                  } elsif($1 eq 'channelinfo') {
243                  else                          $self->{'channelinfo'};
244                  {                  } else {
245                          '';                          '';
246                  }                  }
247          /gex;          /gex;
248                    
249          $self->write($header.chr(0));          $self->write($header);
250  }  }
251    
252  sub sendMessage {  sub sendMessages {
253          my $self=shift;          my $self=shift;
         my $msg=shift;  
         my $numMsgInThisBatch=shift;  
254                    
255          $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch));          my $numMessages=0;
256            my $msgTemplate=$self->getConf('MessageTemplate');
257            my $msgData='';
258            
259            foreach my $message (@_)
260            {
261                    $msgData.=$message->messageWithTemplate($msgTemplate);
262                    $numMessages++;
263            }
264            
265            return if($numMessages<1);
266                    
267          $self->write($msg.chr(0));          $self->write($msgData);
268                    
269          $::Statistics->{'messages_served'}+=$numMsgInThisBatch;          $::Statistics->{'messages_served'}+=$numMessages;
270                    
271          my $msgCount=++$self->{'MessageCount'};          my $msgCount=$self->{'MessageCount'};
272            $msgCount+=$numMessages;
273            $self->{'MessageCount'}=$msgCount;
274                    
275          my $maxMsg=$::CONF{'MaxMessages'};          my $maxMsg=$self->getConf('MaxMessages');
276          if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)          if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
277          {          {
278                  $self->close(1);                  $self->close(1, 'maxMessageCountReached');
279          }          }
280                    
281          if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})          if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
282          {          {
283                  $self->close(1);                  $self->close(1, 'maxMessageCountReached');
284          }          }
285            
286    }
287    
288    sub ping {
289            my $self=shift;
290            my $msg=$self->getConf('PingMessage');
291            
292            $self->write($msg);
293  }  }
294    
295  sub closeChannel {  sub closeChannel {
# Line 308  sub closeChannel { Line 299  sub closeChannel {
299          return unless(exists($self->{'channels'}->{$channelName}));          return unless(exists($self->{'channels'}->{$channelName}));
300                    
301          my $channel=$self->{'channels'}->{$channelName};          my $channel=$self->{'channels'}->{$channelName};
302          $channel->removeSubscriber($self);          $channel->removeSubscriber($self,'channelClose');
303                    
304          delete($self->{'channels'}->{$channelName});          delete($self->{'channels'}->{$channelName});
305                    
306          $self->close() if(scalar(keys %{$self->{'channels'}})==0);          $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
307  }  }
308    
309  sub close {  sub close {
310          my $self=shift;          my $self=shift;
311          my $noShutdownMsg=shift;          my $noShutdownMsg=shift;
312            my $reason=shift;
313                    
314          foreach my $channelName (keys %{$self->{'channels'}})          foreach my $channelName (keys %{$self->{'channels'}})
315          {          {
316                  my $channel=$self->{'channels'}->{$channelName};                  my $channel=$self->{'channels'}->{$channelName};
317                  $channel->removeSubscriber($self);                  $channel->removeSubscriber($self,$reason);
318          }          }
319          delete($self->{'channels'});          delete($self->{'channels'});
320                    
321          if(exists($self->{'subscriberID'}))          # If this connection is in the PersistentConnections array, delete it, then anonymise
322          {          # it so that if we have to wait for the write buffer to empty before close, it's only
323            # removed once.
324            if(exists($self->{'subscriberID'})) {
325                  delete($PersistentConnections{$self->{'subscriberID'}});                  delete($PersistentConnections{$self->{'subscriberID'}});
326                    delete($self->{'subscriberID'});
327          }          }
328                    
         #  
329          # Send shutdown message unless remote closed or          # Send shutdown message unless remote closed or
330          # connection not yet established          # connection not yet established
         #  
331          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
332          {          {
333                  my $msg=$::CONF{'SubscriberShutdownMsg'};                  my $msg=$self->getConf('SubscriberShutdownMsg');
334                  if(defined($msg) && $msg ne '')                  if(defined($msg) && $msg ne '')
335                  {                  {
336                          $self->write($msg);                          $self->write($msg);
# Line 356  sub checkForMaxTime { Line 349  sub checkForMaxTime {
349          my $self=shift;          my $self=shift;
350          my $time=shift;          my $time=shift;
351                    
352          $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);          $self->close(1,'maxTime') if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
353    }
354    
355    sub getConf {
356            my $self=shift;
357            my $key=shift;
358            
359            if(exists($self->{'mode'}) && $self->{'mode'} ne '')
360            {
361                    my $k=$key.$self->{'mode'};
362                    
363                    if(exists($::CONF{$k})) {
364                            return $::CONF{$k};
365                    }
366            }
367            
368            $::CONF{$key};
369  }  }
370    
371  1;  1;

Legend:
Removed from v.37  
changed lines
  Added in v.53

  ViewVC Help
Powered by ViewVC 1.1.26