/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 13 by knops.gerd, Mon Apr 30 18:16:17 2007 UTC revision 47 by knops.gerd, Mon Feb 4 21:06:42 2008 UTC
# Line 43  package Meteor::Subscriber; Line 43  package Meteor::Subscriber;
43          @Meteor::Subscriber::ISA=qw(Meteor::Connection);          @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44                    
45          our %PersistentConnections=();          our %PersistentConnections=();
46            our $NumAcceptedConnections=0;
47        
48    
49  ###############################################################################  ###############################################################################
50  # Factory methods  # Factory methods
# Line 63  sub newFromServer { Line 65  sub newFromServer {
65                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
66          }          }
67                    
68            $::Statistics->{'current_subscribers'}++;
69            $::Statistics->{'subscriber_connections_accepted'}++;
70            
71          $self;          $self;
72  }  }
73    
# Line 75  sub deleteSubscriberWithID { Line 80  sub deleteSubscriberWithID {
80                    
81          if(exists($PersistentConnections{$id}))          if(exists($PersistentConnections{$id}))
82          {          {
83                  $PersistentConnections{$id}->close(1);                  $PersistentConnections{$id}->close();
84          }          }
85  }  }
86    
87  sub pingPersistentConnections {  sub pingPersistentConnections {
88          my $class=shift;          my $class=shift;
89                    
         my $msg=$::CONF{'PingMessage'};  
90          my @cons=values %PersistentConnections;          my @cons=values %PersistentConnections;
91                    
92          map { $_->write($msg) } @cons;          map { $_->ping() } @cons;
93  }  }
94    
95  sub checkPersistentConnectionsForMaxTime {  sub checkPersistentConnectionsForMaxTime {
# Line 97  sub checkPersistentConnectionsForMaxTime Line 101  sub checkPersistentConnectionsForMaxTime
101          map { $_->checkForMaxTime($time) } @cons;          map { $_->checkForMaxTime($time) } @cons;
102  }  }
103    
104    sub numSubscribers {
105            
106            return scalar(keys %PersistentConnections);
107    }
108    
109  ###############################################################################  ###############################################################################
110  # Instance methods  # Instance methods
111  ###############################################################################  ###############################################################################
# Line 121  sub processLine { Line 130  sub processLine {
130                  # Analyze header, register with appropiate channel                  # Analyze header, register with appropiate channel
131                  # and send pending messages.                  # and send pending messages.
132                  #                  #
133                  # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1                  # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
134                  #                  #
135                  # Find the 'GET' line                  # Find the 'GET' line
136                  #                  #
137                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/)                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
138                  {                  {
139                          my @formData=split('&',$1);                          my $subscriberID=$1;
140                          my $channelName=undef;                          $self->{'mode'}=$2;
141                          my $startIndex=undef;                          my $persist=$self->getConf('Persist');
142                          my $backtrack=undef;                          
143                          my $persist=1;                          if ($self->{'mode'} eq "xhrinteractive" || $self->{'mode'} eq "iframe" || $self->{'mode'} eq "serversent" || $self->{'mode'} eq "longpoll") {
144                          my $anyPersist=0;                                  $persist=1;
145                          my $subscriberID=undef;                                  $self->{'MaxMessageCount'}=1 unless(!($self->{'mode'} eq "longpoll"));
146                          my $channels={};                          }
147                          foreach my $formElement (@formData)                          if ($self->{'mode'} eq "iframe") {
148                          {                                  $self->{'HeaderTemplateNumber'}=1;
149                                  if($formElement=~/^channel=(.+)$/)                          } else {
150                                  {                                  $self->{'HeaderTemplateNumber'}=2;
                                         if(defined($channelName))  
                                         {  
                                                 if(defined($startIndex) && defined($backtrack))  
                                                 {  
                                                         $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");  
                                                         $self->close();  
                                                           
                                                         return;  
                                                 }  
                                                   
                                                 $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));  
                                                 $channels->{$channelName}->{'startIndex'}=$startIndex;  
                                                 $channels->{$channelName}->{'persist'}=$persist;  
                                                 $anyPersist|=$persist;  
                                                   
                                                 $startIndex=undef;  
                                                 $backtrack=undef;  
                                                 $persist=1;  
                                         }  
                                         $channelName=$1;  
                                 }  
                                 elsif($formElement=~/^restartfrom=(\d*)$/)  
                                 {  
                                         $startIndex=$1;  
                                         $startIndex='' unless(defined($startIndex));  
                                 }  
                                 elsif($formElement=~/^backtrack=(\d+)$/)  
                                 {  
                                         $backtrack=$1;  
                                         $backtrack=0 unless(defined($backtrack));  
                                 }  
                                 elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/)  
                                 {  
                                         $persist=0 if($1=~/(no|false|0)/i);  
                                 }  
                                 elsif($formElement=~/^id=(.+)$/)  
                                 {  
                                         $subscriberID=$1;  
                                 }  
                                 elsif($formElement=~/^maxmessages=(\d+)$/i)  
                                 {  
                                         $self->{'MaxMessageCount'}=$1;  
                                 }  
                                 elsif($formElement=~/^template=(\d+)$/i)  
                                 {  
                                         $self->{'HeaderTemplateNumber'}=$1;  
                                 }  
                                 elsif($formElement=~/^maxtime=(\d+)$/i)  
                                 {  
                                         my $clientRequest=$1;  
                                         my $serverDefault=$::CONF{'MaxTime'};  
                                           
                                         if($serverDefault==0 || $serverDefault>$clientRequest)  
                                         {  
                                                 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest;  
                                         }  
                                 }  
151                          }                          }
152                                                    
153                          if(defined($channelName))                          my $maxTime=$self->getConf('MaxTime');
154                            if($maxTime>0)
155                          {                          {
156                                  if(defined($startIndex) && defined($backtrack))                                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
157                                  {                          }
158                                          $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");                          
159                                          $self->close();                          my @channelData=split('/',$3);
160                                                                    my $channels={};
161                                          return;                          my $channelName;
162                            my $offset;
163                            foreach my $chandef (@channelData) {
164                                    if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {
165                                            $channelName = $1;
166                                            $channels->{$channelName}->{'startIndex'} = undef;
167                                            if ($3) {
168                                               $offset = $4;
169                                               if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
170                                               if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
171                                               if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
172                                            }
173                                  }                                  }
                                   
                                 $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));  
                                 $channels->{$channelName}->{'startIndex'}=$startIndex;  
                                 $channels->{$channelName}->{'persist'}=$persist;  
                                 $anyPersist|=$persist;  
174                          }                          }
175                                                    
176                          delete($self->{'headerBuffer'});                          delete($self->{'headerBuffer'});
177                                                    
178                          if(defined($subscriberID) && $anyPersist)                          if($persist)
179                          {                          {
180                                  $self->{'subscriberID'}=$subscriberID;                                  $self->{'subscriberID'}=$subscriberID;
181                                  $self->deleteSubscriberWithID($subscriberID);                                  $self->deleteSubscriberWithID($subscriberID);
# Line 226  sub processLine { Line 185  sub processLine {
185                          if(scalar(keys %{$channels}))                          if(scalar(keys %{$channels}))
186                          {                          {
187                                  $self->emitOKHeader();                                  $self->emitOKHeader();
188                                                                    $self->setChannels($channels,$persist,$self->{'mode'},'');
189                                  $self->setChannels($channels);                                  $self->close(1) unless($persist);
                                   
                                 $self->close(1) unless($anyPersist);  
                                   
190                                  return;                                  return;
191                          }                          }
192                  }                  }
193                    elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
194                    {
195                            $self->deleteSubscriberWithID($1);
196                            $self->emitOKHeader();
197                            $self->close(1);
198                            return;
199                    }
200                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
201                  {                  {
202                          Meteor::Document->serveFileToClient($1,$self);                          Meteor::Document->serveFileToClient($1,$self);
                           
203                          $self->close(1);                          $self->close(1);
                           
204                          return;                          return;
205                  }                  }
206                                    
# Line 253  sub processLine { Line 214  sub processLine {
214  sub setChannels {  sub setChannels {
215          my $self=shift;          my $self=shift;
216          my $channels=shift;          my $channels=shift;
217            my $persist=shift;
218            my $mode=shift || '';
219            my $userAgent=shift || '';
220                    
221          foreach my $channelName (keys %{$channels})          foreach my $channelName (keys %{$channels})
222          {          {
                 my $persist=$channels->{$channelName}->{'persist'};  
223                  my $startIndex=$channels->{$channelName}->{'startIndex'};                  my $startIndex=$channels->{$channelName}->{'startIndex'};
224                                    
225                  my $channel=Meteor::Channel->channelWithName($channelName);                  my $channel=Meteor::Channel->channelWithName($channelName);
226                                    
227                  $self->{'channels'}->{$channelName}=$channel if($persist);                  $self->{'channels'}->{$channelName}=$channel if($persist);
228                                    
229                  $channel->addSubscriber($self,$startIndex,$persist);                  $channel->addSubscriber($self,$startIndex,$persist,$mode,$userAgent);
230          }          }
231  }  }
232    
# Line 277  sub emitErrorHeader { Line 240  sub emitErrorHeader {
240          my $self=shift;          my $self=shift;
241                    
242          $self->emitHeader('404 Not Found');          $self->emitHeader('404 Not Found');
243            $::Statistics->{'errors_served'}++;
244                    
245          # close up shop here!          # close up shop here!
246          $self->close();          $self->close();
# Line 291  sub emitHeader { Line 255  sub emitHeader {
255          {          {
256                  my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};                  my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};
257                                    
258                  $header=$::CONF{$hn};                  $header=$self->getConf($hn);
259          }          }
260          $header=$::CONF{'HeaderTemplate'} unless(defined($header));          $header=$self->getConf('HeaderTemplate') unless(defined($header));
261                    
262          $header=~s/~([^~]+)~/          $header=~s/~([^~]*)~/
263                  if(!defined($1) || $1 eq '')                  if(!defined($1) || $1 eq '')
264                  {                  {
265                          '~';                          '~';
# Line 312  sub emitHeader { Line 276  sub emitHeader {
276                  {                  {
277                          time;                          time;
278                  }                  }
279                    elsif($1 eq 'channelinfo')
280                    {
281                            Meteor::Channel->listChannelsUsingTemplate($self->getConf('ChannelInfoTemplate'));
282                    }
283                  else                  else
284                  {                  {
285                          '';                          '';
# Line 321  sub emitHeader { Line 289  sub emitHeader {
289          $self->write($header);          $self->write($header);
290  }  }
291    
292  sub sendMessage {  sub sendMessages {
293          my $self=shift;          my $self=shift;
         my $msg=shift;  
294                    
295          $self->write($msg);          my $numMessages=0;
296            my $msgTemplate=$self->getConf('Messagetemplate');
297            my $msgData='';
298            
299            foreach my $message (@_)
300            {
301                    $msgData.=$message->messageWithTemplate($msgTemplate);
302                    $numMessages++;
303            }
304                    
305          my $msgCount=++$self->{'MessageCount'};          return if($numMessages<1);
306                    
307          my $maxMsg=$::CONF{'MaxMessages'};          $self->write($msgData);
308            
309            $::Statistics->{'messages_served'}+=$numMessages;
310            
311            my $msgCount=$self->{'MessageCount'};
312            $msgCount+=$numMessages;
313            $self->{'MessageCount'}=$msgCount;
314            
315            my $maxMsg=$self->getConf('MaxMessages');
316          if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)          if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
317          {          {
318                  $self->close(1);                  $self->close(1);
# Line 339  sub sendMessage { Line 322  sub sendMessage {
322          {          {
323                  $self->close(1);                  $self->close(1);
324          }          }
325            
326    }
327    
328    sub ping {
329            my $self=shift;
330            my $msg=$self->getConf('PingMessage');
331            
332            $self->write($msg);
333  }  }
334    
335  sub closeChannel {  sub closeChannel {
# Line 348  sub closeChannel { Line 339  sub closeChannel {
339          return unless(exists($self->{'channels'}->{$channelName}));          return unless(exists($self->{'channels'}->{$channelName}));
340                    
341          my $channel=$self->{'channels'}->{$channelName};          my $channel=$self->{'channels'}->{$channelName};
342          $channel->removeSubscriber($self);          $channel->removeSubscriber($self,'channelClose');
343                    
344          delete($self->{'channels'}->{$channelName});          delete($self->{'channels'}->{$channelName});
345                    
346          $self->close() if(scalar(keys %{$self->{'channels'}})==0);          $self->close(0,'channelsClosed') if(scalar(keys %{$self->{'channels'}})==0);
347  }  }
348    
349  sub close {  sub close {
# Line 362  sub close { Line 353  sub close {
353          foreach my $channelName (keys %{$self->{'channels'}})          foreach my $channelName (keys %{$self->{'channels'}})
354          {          {
355                  my $channel=$self->{'channels'}->{$channelName};                  my $channel=$self->{'channels'}->{$channelName};
356                  $channel->removeSubscriber($self);                  $channel->removeSubscriber($self,'subscriberClose');
357          }          }
358          delete($self->{'channels'});          delete($self->{'channels'});
359                    
# Line 377  sub close { Line 368  sub close {
368          #          #
369          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
370          {          {
371                  my $msg=$::CONF{'SubscriberShutdownMsg'};                  my $msg=$self->getConf('SubscriberShutdownMsg');
372                  if(defined($msg) && $msg ne '')                  if(defined($msg) && $msg ne '')
373                  {                  {
374                          $self->write($msg);                          $self->write($msg);
# Line 387  sub close { Line 378  sub close {
378          $self->SUPER::close();          $self->SUPER::close();
379  }  }
380    
381    sub didClose {
382            
383            $::Statistics->{'current_subscribers'}--;
384    }
385    
386  sub checkForMaxTime {  sub checkForMaxTime {
387          my $self=shift;          my $self=shift;
388          my $time=shift;          my $time=shift;
# Line 394  sub checkForMaxTime { Line 390  sub checkForMaxTime {
390          $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);          $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
391  }  }
392    
393    sub getConf {
394            my $self=shift;
395            my $key=shift;
396            
397            if(exists($self->{'mode'}) && $self->{'mode'} ne '')
398            {
399                    my $k=$key.'.'.$self->{'mode'};
400                    
401                    return $::CONF{$k} if(exists($::CONF{$k}));
402            }
403            
404            $::CONF{$key};
405    }
406    
407  1;  1;
408  ############################################################################EOF  ############################################################################EOF

Legend:
Removed from v.13  
changed lines
  Added in v.47

  ViewVC Help
Powered by ViewVC 1.1.26