/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 17 by knops.gerd, Tue May 1 17:10:10 2007 UTC revision 51 by andrew.betts, Wed Feb 27 14:05:59 2008 UTC
# Line 43  package Meteor::Subscriber; Line 43  package Meteor::Subscriber;
43          @Meteor::Subscriber::ISA=qw(Meteor::Connection);          @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44                    
45          our %PersistentConnections=();          our %PersistentConnections=();
46            our $NumAcceptedConnections=0;
47        
48    
49  ###############################################################################  ###############################################################################
50  # Factory methods  # Factory methods
# Line 63  sub newFromServer { Line 65  sub newFromServer {
65                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
66          }          }
67                    
68            $::Statistics->{'current_subscribers'}++;
69            $::Statistics->{'subscriber_connections_accepted'}++;
70            
71          $self;          $self;
72  }  }
73    
# Line 75  sub deleteSubscriberWithID { Line 80  sub deleteSubscriberWithID {
80                    
81          if(exists($PersistentConnections{$id}))          if(exists($PersistentConnections{$id}))
82          {          {
83                  $PersistentConnections{$id}->close(1);                  $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
84          }          }
85  }  }
86    
87  sub pingPersistentConnections {  sub pingPersistentConnections {
88          my $class=shift;          my $class=shift;
89                    
         my $msg=$::CONF{'PingMessage'};  
90          my @cons=values %PersistentConnections;          my @cons=values %PersistentConnections;
91                    
92          map { $_->write($msg) } @cons;          map { $_->ping() } @cons;
93  }  }
94    
95  sub checkPersistentConnectionsForMaxTime {  sub checkPersistentConnectionsForMaxTime {
# Line 97  sub checkPersistentConnectionsForMaxTime Line 101  sub checkPersistentConnectionsForMaxTime
101          map { $_->checkForMaxTime($time) } @cons;          map { $_->checkForMaxTime($time) } @cons;
102  }  }
103    
104    sub numSubscribers {
105            
106            return scalar(keys %PersistentConnections);
107    }
108    
109  ###############################################################################  ###############################################################################
110  # Instance methods  # Instance methods
111  ###############################################################################  ###############################################################################
# Line 121  sub processLine { Line 130  sub processLine {
130                  # Analyze header, register with appropiate channel                  # Analyze header, register with appropiate channel
131                  # and send pending messages.                  # and send pending messages.
132                  #                  #
133                  # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1                  # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
134                  #                  #
135                  # Find the 'GET' line                  # Find the 'GET' line
136                  #                  #
137                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/)                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
138                  {                  {
139                          my @formData=split('&',$1);                          $self->{'subscriberID'}=$1;
140                          my $channelName=undef;                          $self->{'mode'}=$2;
141                          my $startIndex=undef;                          my $persist=$self->getConf('Persist');
142                          my $backtrack=undef;                          my $maxTime=$self->getConf('MaxTime');
143                          my $persist=1;                          $self->{'ConnectionTimeLimit'} = ($self->{'ConnectionStart'}+$maxTime) if ($maxTime>0);
144                          my $subscriberID=undef;                          
145                            my @channelData=split('/',$3);
146                          my $channels={};                          my $channels={};
147                          foreach my $formElement (@formData)                          my $channelName;
148                          {                          my $offset;
149                                  if($formElement=~/^channel=(.+)$/)                          foreach my $chandef (@channelData) {
150                                  {                                  if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
151                                          if(defined($channelName))                                          $channelName = $1;
152                                          {                                          $channels->{$channelName}->{'startIndex'} = undef;
153                                                  if(defined($startIndex) && defined($backtrack))                                          if ($3) {
154                                                  {                                             $offset = $4;
155                                                          $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");                                             if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
156                                                          $self->close();                                             if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
157                                                                                                       if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
                                                         return;  
                                                 }  
                                                   
                                                 $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));  
                                                 $channels->{$channelName}->{'startIndex'}=$startIndex;  
                                                   
                                                 $startIndex=undef;  
                                                 $backtrack=undef;  
                                         }  
                                         $channelName=$1;  
                                 }  
                                 elsif($formElement=~/^restartfrom=(\d*)$/)  
                                 {  
                                         $startIndex=$1;  
                                         $startIndex='' unless(defined($startIndex));  
                                 }  
                                 elsif($formElement=~/^backtrack=(\d+)$/)  
                                 {  
                                         $backtrack=$1;  
                                         $backtrack=0 unless(defined($backtrack));  
                                 }  
                                 elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/)  
                                 {  
                                         $persist=0 if($1=~/(no|false|0)/i);  
                                 }  
                                 elsif($formElement=~/^id=(.+)$/)  
                                 {  
                                         $subscriberID=$1;  
                                 }  
                                 elsif($formElement=~/^maxmessages=(\d+)$/i)  
                                 {  
                                         $self->{'MaxMessageCount'}=$1;  
                                 }  
                                 elsif($formElement=~/^template=(\d+)$/i)  
                                 {  
                                         $self->{'HeaderTemplateNumber'}=$1;  
                                 }  
                                 elsif($formElement=~/^maxtime=(\d+)$/i)  
                                 {  
                                         my $clientRequest=$1;  
                                         my $serverDefault=$::CONF{'MaxTime'};  
                                           
                                         if($serverDefault==0 || $serverDefault>$clientRequest)  
                                         {  
                                                 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest;  
158                                          }                                          }
159                                  }                                  }
160                          }                          }
161                                                    my $useragent = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
                         if(defined($channelName))  
                         {  
                                 if(defined($startIndex) && defined($backtrack))  
                                 {  
                                         $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");  
                                         $self->close();  
                                           
                                         return;  
                                 }  
                                   
                                 $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));  
                                 $channels->{$channelName}->{'startIndex'}=$startIndex;  
                         }  
162                                                    
163                          delete($self->{'headerBuffer'});                          delete($self->{'headerBuffer'});
164                                                    
165                          if(defined($subscriberID) && $persist)                          if ($persist) {
166                          {                                  $self->deleteSubscriberWithID($self->{'subscriberID'});
167                                  $self->{'subscriberID'}=$subscriberID;                                  $PersistentConnections{$self->{'subscriberID'}}=$self;
                                 $self->deleteSubscriberWithID($subscriberID);  
                                 $PersistentConnections{$subscriberID}=$self;  
168                          }                          }
169                                                    
170                          if(scalar(keys %{$channels}))                          if(scalar(keys %{$channels}))
171                          {                          {
172                                  $self->emitOKHeader();                                  $self->emitOKHeader();
173                                                                    $self->setChannels($channels,$persist,$self->{'mode'},$useragent);
174                                  $self->setChannels($channels,$persist);                                  $self->close(1, 'responseComplete') unless($persist);
                                   
                                 $self->close(1) unless($persist);  
                                   
175                                  return;                                  return;
176                          }                          }
177                  }                  }
178                    elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
179                    {
180                            $self->deleteSubscriberWithID($1);
181                            $self->emitOKHeader();
182                            $self->close(1, 'disconnectRequested');
183                            return;
184                    }
185                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
186                  {                  {
187                          Meteor::Document->serveFileToClient($1,$self);                          Meteor::Document->serveFileToClient($1,$self);
188                                                    $self->close(1, 'responseComplete');
                         $self->close(1);  
                           
189                          return;                          return;
190                  }                  }
191                                    
# Line 248  sub setChannels { Line 200  sub setChannels {
200          my $self=shift;          my $self=shift;
201          my $channels=shift;          my $channels=shift;
202          my $persist=shift;          my $persist=shift;
203            my $mode=shift || '';
204            my $userAgent=shift || '';
205                    
206          foreach my $channelName (keys %{$channels})          foreach my $channelName (keys %{$channels})
207          {          {
# Line 257  sub setChannels { Line 211  sub setChannels {
211                                    
212                  $self->{'channels'}->{$channelName}=$channel if($persist);                  $self->{'channels'}->{$channelName}=$channel if($persist);
213                                    
214                  $channel->addSubscriber($self,$startIndex,$persist);                  $channel->addSubscriber($self,$startIndex,$persist,$mode,$userAgent);
215          }          }
216  }  }
217    
# Line 271  sub emitErrorHeader { Line 225  sub emitErrorHeader {
225          my $self=shift;          my $self=shift;
226                    
227          $self->emitHeader('404 Not Found');          $self->emitHeader('404 Not Found');
228            $::Statistics->{'errors_served'}++;
229                    
230          # close up shop here!          # close up shop here!
231          $self->close();          $self->close(0, 'error');
232  }  }
233    
234  sub emitHeader {  sub emitHeader {
235          my $self=shift;          my $self=shift;
236          my $status=shift;          my $status=shift;
237                    
238          my $header=undef;          my $header=$self->getConf('HeaderTemplate');
         if(exists($self->{'HeaderTemplateNumber'}))  
         {  
                 my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};  
                   
                 $header=$::CONF{$hn};  
         }  
         $header=$::CONF{'HeaderTemplate'} unless(defined($header));  
239                    
240          $header=~s/~([^~]+)~/          $header=~s/~([^~]*)~/
241                  if(!defined($1) || $1 eq '')                  if(!defined($1) || $1 eq '')
242                  {                  {
243                          '~';                          '~';
# Line 306  sub emitHeader { Line 254  sub emitHeader {
254                  {                  {
255                          time;                          time;
256                  }                  }
257                    elsif($1 eq 'channelinfo')
258                    {
259                            Meteor::Channel->listChannelsUsingTemplate($self->getConf('ChannelInfoTemplate'));
260                    }
261                  else                  else
262                  {                  {
263                          '';                          '';
# Line 315  sub emitHeader { Line 267  sub emitHeader {
267          $self->write($header);          $self->write($header);
268  }  }
269    
270  sub sendMessage {  sub sendMessages {
271          my $self=shift;          my $self=shift;
         my $msg=shift;  
272                    
273          $self->write($msg);          my $numMessages=0;
274            my $msgTemplate=$self->getConf('MessageTemplate');
275            my $msgData='';
276            
277            foreach my $message (@_)
278            {
279                    $msgData.=$message->messageWithTemplate($msgTemplate);
280                    $numMessages++;
281            }
282            
283            return if($numMessages<1);
284            
285            $self->write($msgData);
286                    
287          my $msgCount=++$self->{'MessageCount'};          $::Statistics->{'messages_served'}+=$numMessages;
288                    
289          my $maxMsg=$::CONF{'MaxMessages'};          my $msgCount=$self->{'MessageCount'};
290            $msgCount+=$numMessages;
291            $self->{'MessageCount'}=$msgCount;
292            
293            my $maxMsg=$self->getConf('MaxMessages');
294          if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)          if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
295          {          {
296                  $self->close(1);                  $self->close(1, 'maxMessageCountReached');
297          }          }
298                    
299          if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})          if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
300          {          {
301                  $self->close(1);                  $self->close(1, 'maxMessageCountReached');
302          }          }
303            
304    }
305    
306    sub ping {
307            my $self=shift;
308            my $msg=$self->getConf('PingMessage');
309            
310            $self->write($msg);
311  }  }
312    
313  sub closeChannel {  sub closeChannel {
# Line 342  sub closeChannel { Line 317  sub closeChannel {
317          return unless(exists($self->{'channels'}->{$channelName}));          return unless(exists($self->{'channels'}->{$channelName}));
318                    
319          my $channel=$self->{'channels'}->{$channelName};          my $channel=$self->{'channels'}->{$channelName};
320          $channel->removeSubscriber($self);          $channel->removeSubscriber($self,'channelClose');
321                    
322          delete($self->{'channels'}->{$channelName});          delete($self->{'channels'}->{$channelName});
323                    
324          $self->close() if(scalar(keys %{$self->{'channels'}})==0);          $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
325  }  }
326    
327  sub close {  sub close {
328          my $self=shift;          my $self=shift;
329          my $noShutdownMsg=shift;          my $noShutdownMsg=shift;
330            my $reason=shift;
331                    
332          foreach my $channelName (keys %{$self->{'channels'}})          foreach my $channelName (keys %{$self->{'channels'}})
333          {          {
334                  my $channel=$self->{'channels'}->{$channelName};                  my $channel=$self->{'channels'}->{$channelName};
335                  $channel->removeSubscriber($self);                  $channel->removeSubscriber($self,$reason);
336          }          }
337          delete($self->{'channels'});          delete($self->{'channels'});
338                    
339          if(exists($self->{'subscriberID'}))          # If this connection is in the PersistentConnections array, delete it, then anonymise
340          {          # it so that if we have to wait for the write buffer to empty before close, it's only
341            # removed once.
342            if(exists($self->{'subscriberID'})) {
343                  delete($PersistentConnections{$self->{'subscriberID'}});                  delete($PersistentConnections{$self->{'subscriberID'}});
344                    delete($self->{'subscriberID'});
345          }          }
346                    
         #  
347          # Send shutdown message unless remote closed or          # Send shutdown message unless remote closed or
348          # connection not yet established          # connection not yet established
         #  
349          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
350          {          {
351                  my $msg=$::CONF{'SubscriberShutdownMsg'};                  my $msg=$self->getConf('SubscriberShutdownMsg');
352                  if(defined($msg) && $msg ne '')                  if(defined($msg) && $msg ne '')
353                  {                  {
354                          $self->write($msg);                          $self->write($msg);
# Line 381  sub close { Line 358  sub close {
358          $self->SUPER::close();          $self->SUPER::close();
359  }  }
360    
361    sub didClose {
362            
363            $::Statistics->{'current_subscribers'}--;
364    }
365    
366  sub checkForMaxTime {  sub checkForMaxTime {
367          my $self=shift;          my $self=shift;
368          my $time=shift;          my $time=shift;
369                    
370          $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);          $self->close(1,'maxTime') if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
371    }
372    
373    sub getConf {
374            my $self=shift;
375            my $key=shift;
376            
377            if(exists($self->{'mode'}) && $self->{'mode'} ne '')
378            {
379                    my $k=$key.$self->{'mode'};
380                    
381                    if(exists($::CONF{$k})) {
382                            return $::CONF{$k};
383                    }
384            }
385            
386            $::CONF{$key};
387  }  }
388    
389  1;  1;

Legend:
Removed from v.17  
changed lines
  Added in v.51

  ViewVC Help
Powered by ViewVC 1.1.26