/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 11 by knops.gerd, Thu Dec 14 16:29:42 2006 UTC revision 49 by knops.gerd, Mon Feb 4 22:30:35 2008 UTC
# Line 43  package Meteor::Subscriber; Line 43  package Meteor::Subscriber;
43          @Meteor::Subscriber::ISA=qw(Meteor::Connection);          @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44                    
45          our %PersistentConnections=();          our %PersistentConnections=();
46            our $NumAcceptedConnections=0;
47        
48    
49  ###############################################################################  ###############################################################################
50  # Factory methods  # Factory methods
# Line 63  sub newFromServer { Line 65  sub newFromServer {
65                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
66          }          }
67                    
68            $::Statistics->{'current_subscribers'}++;
69            $::Statistics->{'subscriber_connections_accepted'}++;
70            
71          $self;          $self;
72  }  }
73    
# Line 75  sub deleteSubscriberWithID { Line 80  sub deleteSubscriberWithID {
80                    
81          if(exists($PersistentConnections{$id}))          if(exists($PersistentConnections{$id}))
82          {          {
83                  $PersistentConnections{$id}->close(1);                  $PersistentConnections{$id}->close();
84          }          }
85  }  }
86    
87  sub pingPersistentConnections {  sub pingPersistentConnections {
88          my $class=shift;          my $class=shift;
89                    
         my $msg=$::CONF{'PingMessage'};  
90          my @cons=values %PersistentConnections;          my @cons=values %PersistentConnections;
91                    
92          map { $_->write($msg) } @cons;          map { $_->ping() } @cons;
93  }  }
94    
95  sub checkPersistentConnectionsForMaxTime {  sub checkPersistentConnectionsForMaxTime {
# Line 97  sub checkPersistentConnectionsForMaxTime Line 101  sub checkPersistentConnectionsForMaxTime
101          map { $_->checkForMaxTime($time) } @cons;          map { $_->checkForMaxTime($time) } @cons;
102  }  }
103    
104    sub numSubscribers {
105            
106            return scalar(keys %PersistentConnections);
107    }
108    
109  ###############################################################################  ###############################################################################
110  # Instance methods  # Instance methods
111  ###############################################################################  ###############################################################################
# Line 121  sub processLine { Line 130  sub processLine {
130                  # Analyze header, register with appropiate channel                  # Analyze header, register with appropiate channel
131                  # and send pending messages.                  # and send pending messages.
132                  #                  #
133                  # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1                  # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
134                  #                  #
135                  # Find the 'GET' line                  # Find the 'GET' line
136                  #                  #
137                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/)                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
138                  {                  {
139                          my @formData=split('&',$1);                          my $subscriberID=$1;
140                          my $channelName=undef;                          $self->{'mode'}=$2;
141                          my $startIndex=undef;                          my $persist=$self->getConf('Persist');
142                          my $backtrack=undef;                          
143                          my $persist=1;                          my $maxTime=$self->getConf('MaxTime');
144                          my $subscriberID=undef;                          if($maxTime>0)
                         foreach my $formElement (@formData)  
145                          {                          {
146                                  if($formElement=~/^channel=(.+)$/)                                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
147                                  {                          }
148                                          $channelName=$1;                          
149                                  }                          my @channelData=split('/',$3);
150                                  elsif($formElement=~/^restartfrom=(\d*)$/)                          my $channels={};
151                                  {                          my $channelName;
152                                          $startIndex=$1;                          my $offset;
153                                          $startIndex='' unless(defined($startIndex));                          foreach my $chandef (@channelData) {
154                                  }                                  if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {
155                                  elsif($formElement=~/^backtrack=(\d+)$/)                                          $channelName = $1;
156                                  {                                          $channels->{$channelName}->{'startIndex'} = undef;
157                                          $backtrack=$1;                                          if ($3) {
158                                          $backtrack=0 unless(defined($backtrack));                                             $offset = $4;
159                                  }                                             if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
160                                  elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/)                                             if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
161                                  {                                             if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
                                         $persist=0 if($1=~/(no|false|0)/i);  
                                 }  
                                 elsif($formElement=~/^id=(.+)$/)  
                                 {  
                                         $subscriberID=$1;  
                                 }  
                                 elsif($formElement=~/^maxmessages=(\d+)$/i)  
                                 {  
                                         $self->{'MaxMessageCount'}=$1;  
                                 }  
                                 elsif($formElement=~/^template=(\d+)$/i)  
                                 {  
                                         $self->{'HeaderTemplateNumber'}=$1;  
                                 }  
                                 elsif($formElement=~/^maxtime=(\d+)$/i)  
                                 {  
                                         my $clientRequest=$1;  
                                         my $serverDefault=$::CONF{'MaxTime'};  
                                           
                                         if($serverDefault==0 || $serverDefault>$clientRequest)  
                                         {  
                                                 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest;  
162                                          }                                          }
163                                  }                                  }
164                          }                          }
                                                   
                         delete($self->{'headerBuffer'});  
165                                                    
166                          if(defined($startIndex) && defined($backtrack))                          delete($self->{'headerBuffer'});
                         {  
                                 $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");  
                                 $self->close();  
                                   
                                 return;  
                         }  
167                                                    
168                          if(defined($subscriberID) && $persist)                          if($persist)
169                          {                          {
170                                  $self->{'subscriberID'}=$subscriberID;                                  $self->{'subscriberID'}=$subscriberID;
171                                  $self->deleteSubscriberWithID($subscriberID);                                  $self->deleteSubscriberWithID($subscriberID);
172                                  $PersistentConnections{$subscriberID}=$self;                                  $PersistentConnections{$subscriberID}=$self;
173                          }                          }
174                                                    
175                          if(defined($channelName))                          if(scalar(keys %{$channels}))
176                          {                          {
177                                  $self->emitOKHeader();                                  $self->emitOKHeader();
178                                                                    $self->setChannels($channels,$persist,$self->{'mode'},'');
                                 $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));  
                                   
                                 $self->setChannelName($channelName,$startIndex,$persist);  
                                   
179                                  $self->close(1) unless($persist);                                  $self->close(1) unless($persist);
                                   
180                                  return;                                  return;
181                          }                          }
182                  }                  }
183                    elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
184                    {
185                            $self->deleteSubscriberWithID($1);
186                            $self->emitOKHeader();
187                            $self->close(1);
188                            return;
189                    }
190                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
191                  {                  {
192                          Meteor::Document->serveFileToClient($1,$self);                          Meteor::Document->serveFileToClient($1,$self);
                           
193                          $self->close(1);                          $self->close(1);
                           
194                          return;                          return;
195                  }                  }
196                                    
# Line 223  sub processLine { Line 201  sub processLine {
201          }          }
202  }  }
203    
204  sub setChannelName {  sub setChannels {
205          my $self=shift;          my $self=shift;
206          my $channelName=shift;          my $channels=shift;
         my $startIndex=shift;  
207          my $persist=shift;          my $persist=shift;
208            my $mode=shift || '';
209            my $userAgent=shift || '';
210                    
211          my $channel=Meteor::Channel->channelWithName($channelName);          foreach my $channelName (keys %{$channels})
212          $self->{'channel'}=$channel if($persist);          {
213                            my $startIndex=$channels->{$channelName}->{'startIndex'};
214          $channel->addSubscriber($self,$startIndex,$persist);                  
215                    my $channel=Meteor::Channel->channelWithName($channelName);
216                    
217                    $self->{'channels'}->{$channelName}=$channel if($persist);
218                    
219                    $channel->addSubscriber($self,$startIndex,$persist,$mode,$userAgent);
220            }
221  }  }
222    
223  sub emitOKHeader {  sub emitOKHeader {
# Line 245  sub emitErrorHeader { Line 230  sub emitErrorHeader {
230          my $self=shift;          my $self=shift;
231                    
232          $self->emitHeader('404 Not Found');          $self->emitHeader('404 Not Found');
233            $::Statistics->{'errors_served'}++;
234                    
235          # close up shop here!          # close up shop here!
236          $self->close();          $self->close();
# Line 254  sub emitHeader { Line 240  sub emitHeader {
240          my $self=shift;          my $self=shift;
241          my $status=shift;          my $status=shift;
242                    
243          my $header=undef;          my $header=$self->getConf('HeaderTemplate');
         if(exists($self->{'HeaderTemplateNumber'}))  
         {  
                 my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};  
                   
                 $header=$::CONF{$hn};  
         }  
         $header=$::CONF{'HeaderTemplate'} unless(defined($header));  
244                    
245          $header=~s/~([^~]+)~/          $header=~s/~([^~]*)~/
246                  if(!defined($1) || $1 eq '')                  if(!defined($1) || $1 eq '')
247                  {                  {
248                          '~';                          '~';
# Line 280  sub emitHeader { Line 259  sub emitHeader {
259                  {                  {
260                          time;                          time;
261                  }                  }
262                    elsif($1 eq 'channelinfo')
263                    {
264                            Meteor::Channel->listChannelsUsingTemplate($self->getConf('ChannelInfoTemplate'));
265                    }
266                  else                  else
267                  {                  {
268                          '';                          '';
# Line 289  sub emitHeader { Line 272  sub emitHeader {
272          $self->write($header);          $self->write($header);
273  }  }
274    
275  sub sendMessage {  sub sendMessages {
276          my $self=shift;          my $self=shift;
         my $msg=shift;  
277                    
278          $self->write($msg);          my $numMessages=0;
279            my $msgTemplate=$self->getConf('Messagetemplate');
280            my $msgData='';
281            
282            foreach my $message (@_)
283            {
284                    $msgData.=$message->messageWithTemplate($msgTemplate);
285                    $numMessages++;
286            }
287                    
288          my $msgCount=++$self->{'MessageCount'};          return if($numMessages<1);
289                    
290          my $maxMsg=$::CONF{'MaxMessages'};          $self->write($msgData);
291            
292            $::Statistics->{'messages_served'}+=$numMessages;
293            
294            my $msgCount=$self->{'MessageCount'};
295            $msgCount+=$numMessages;
296            $self->{'MessageCount'}=$msgCount;
297            
298            my $maxMsg=$self->getConf('MaxMessages');
299          if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)          if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
300          {          {
301                  $self->close(1);                  $self->close(1);
# Line 307  sub sendMessage { Line 305  sub sendMessage {
305          {          {
306                  $self->close(1);                  $self->close(1);
307          }          }
308            
309    }
310    
311    sub ping {
312            my $self=shift;
313            my $msg=$self->getConf('PingMessage');
314            
315            $self->write($msg);
316    }
317    
318    sub closeChannel {
319            my $self=shift;
320            my $channelName=shift;
321            
322            return unless(exists($self->{'channels'}->{$channelName}));
323            
324            my $channel=$self->{'channels'}->{$channelName};
325            $channel->removeSubscriber($self,'channelClose');
326            
327            delete($self->{'channels'}->{$channelName});
328            
329            $self->close(0,'channelsClosed') if(scalar(keys %{$self->{'channels'}})==0);
330  }  }
331    
332  sub close {  sub close {
333          my $self=shift;          my $self=shift;
334          my $noShutdownMsg=shift;          my $noShutdownMsg=shift;
335                    
336          $self->{'channel'}->removeSubscriber($self) if($self->{'channel'});          foreach my $channelName (keys %{$self->{'channels'}})
337          delete($self->{'channel'});          {
338                    my $channel=$self->{'channels'}->{$channelName};
339                    $channel->removeSubscriber($self,'subscriberClose');
340            }
341            delete($self->{'channels'});
342                    
343          if(exists($self->{'subscriberID'}))          if(exists($self->{'subscriberID'}))
344          {          {
# Line 327  sub close { Line 351  sub close {
351          #          #
352          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
353          {          {
354                  my $msg=$::CONF{'SubscriberShutdownMsg'};                  my $msg=$self->getConf('SubscriberShutdownMsg');
355                  if(defined($msg) && $msg ne '')                  if(defined($msg) && $msg ne '')
356                  {                  {
357                          $self->write($msg);                          $self->write($msg);
# Line 337  sub close { Line 361  sub close {
361          $self->SUPER::close();          $self->SUPER::close();
362  }  }
363    
364    sub didClose {
365            
366            $::Statistics->{'current_subscribers'}--;
367    }
368    
369  sub checkForMaxTime {  sub checkForMaxTime {
370          my $self=shift;          my $self=shift;
371          my $time=shift;          my $time=shift;
# Line 344  sub checkForMaxTime { Line 373  sub checkForMaxTime {
373          $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);          $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
374  }  }
375    
376    sub getConf {
377            my $self=shift;
378            my $key=shift;
379            
380            if(exists($self->{'mode'}) && $self->{'mode'} ne '')
381            {
382                    my $k=$key.'.'.$self->{'mode'};
383                    
384                    return $::CONF{$k} if(exists($::CONF{$k}));
385            }
386            
387            $::CONF{$key};
388    }
389    
390  1;  1;
391  ############################################################################EOF  ############################################################################EOF

Legend:
Removed from v.11  
changed lines
  Added in v.49

  ViewVC Help
Powered by ViewVC 1.1.26