/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 3 by andrew.betts, Mon Nov 20 17:59:30 2006 UTC revision 35 by andrew.betts, Fri Jan 25 17:12:02 2008 UTC
# Line 43  package Meteor::Subscriber; Line 43  package Meteor::Subscriber;
43          @Meteor::Subscriber::ISA=qw(Meteor::Connection);          @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44                    
45          our %PersistentConnections=();          our %PersistentConnections=();
46            our $NumAcceptedConnections=0;
47    
48  ###############################################################################  ###############################################################################
49  # Factory methods  # Factory methods
# Line 63  sub newFromServer { Line 64  sub newFromServer {
64                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
65          }          }
66                    
67            $::Statistics->{'current_subscribers'}++;
68            $::Statistics->{'subscriber_connections_accepted'}++;
69            
70          $self;          $self;
71  }  }
72    
# Line 85  sub pingPersistentConnections { Line 89  sub pingPersistentConnections {
89          my $msg=$::CONF{'PingMessage'};          my $msg=$::CONF{'PingMessage'};
90          my @cons=values %PersistentConnections;          my @cons=values %PersistentConnections;
91                    
92          map { $_->write($msg) } @cons;          map { $_->write($msg.chr(0)) } @cons;
93  }  }
94    
95  sub checkPersistentConnectionsForMaxTime {  sub checkPersistentConnectionsForMaxTime {
# Line 97  sub checkPersistentConnectionsForMaxTime Line 101  sub checkPersistentConnectionsForMaxTime
101          map { $_->checkForMaxTime($time) } @cons;          map { $_->checkForMaxTime($time) } @cons;
102  }  }
103    
104    sub numSubscribers {
105            
106            return scalar(keys %PersistentConnections);
107    }
108    
109  ###############################################################################  ###############################################################################
110  # Instance methods  # Instance methods
111  ###############################################################################  ###############################################################################
# Line 121  sub processLine { Line 130  sub processLine {
130                  # Analyze header, register with appropiate channel                  # Analyze header, register with appropiate channel
131                  # and send pending messages.                  # and send pending messages.
132                  #                  #
133                  # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1                  # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
134                  #                  #
135                  # Find the 'GET' line                  # Find the 'GET' line
136                  #                  #
137                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/)                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
138                  {                  {
139                          my @formData=split('&',$1);                          my $subscriberID=$1;
140                          my $channelName=undef;                          my $persist=0;
141                          my $startIndex=undef;                          $self->{'mode'}=$2;
142                          my $backtrack=undef;                          if ($self->{'mode'} eq "xhrinteractive" || $self->{'mode'} eq "iframe" || $self->{'mode'} eq "serversent" || $self->{'mode'} eq "longpoll") {
143                          my $persist=1;                                  $persist=1;
144                          my $subscriberID=undef;                                  $self->{'MaxMessageCount'}=1 unless(!($self->{'mode'} eq "longpoll"));
145                          foreach my $formElement (@formData)                          }
146                          {                          if ($self->{'mode'} eq "iframe") {
147                                  if($formElement=~/^channel=(.+)$/)                                  $self->{'HeaderTemplateNumber'}=1;
148                                  {                          } else {
149                                          $channelName=$1;                                  $self->{'HeaderTemplateNumber'}=2;
150                                  }                          }
151                                  elsif($formElement=~/^restartfrom=(\d*)$/)                          my @channelData=split('/',$3);
152                                  {                          my $channels={};
153                                          $startIndex=$1;                          my $channelName;
154                                          $startIndex='' unless(defined($startIndex));                          my $offset;
155                                  }                          foreach my $chandef (@channelData) {
156                                  elsif($formElement=~/^backtrack=(\d+)$/)                                  if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {
157                                  {                                          $channelName = $1;
158                                          $backtrack=$1;                                          $channels->{$channelName}->{'startIndex'} = undef;
159                                          $backtrack=0 unless(defined($backtrack));                                          if ($3) {
160                                  }                                             $offset = $4;
161                                  elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/)                                             if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
162                                  {                                             if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
163                                          $persist=0 if($1=~/(no|false|0)/i);                                             if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
                                 }  
                                 elsif($formElement=~/^id=(.+)$/)  
                                 {  
                                         $subscriberID=$1;  
                                 }  
                                 elsif($formElement=~/^maxmessages=(\d+)$/i)  
                                 {  
                                         $self->{'MaxMessageCount'}=$1;  
                                 }  
                                 elsif($formElement=~/^template=(\d+)$/i)  
                                 {  
                                         $self->{'HeaderTemplateNumber'}=$1;  
                                 }  
                                 elsif($formElement=~/^maxtime=(\d+)$/i)  
                                 {  
                                         my $clientRequest=$1;  
                                         my $serverDefault=$::CONF{'MaxTime'};  
                                           
                                         if($serverDefault==0 || $serverDefault>$clientRequest)  
                                         {  
                                                 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest;  
164                                          }                                          }
165                                  }                                  }
166                          }                          }
                                                   
                         delete($self->{'headerBuffer'});  
167                                                    
168                          if(defined($startIndex) && defined($backtrack))                          delete($self->{'headerBuffer'});
                         {  
                                 $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");  
                                 $self->close();  
                                   
                                 return;  
                         }  
169                                                    
170                          if(defined($subscriberID) && $persist)                          if($persist)
171                          {                          {
172                                  $self->{'subscriberID'}=$subscriberID;                                  $self->{'subscriberID'}=$subscriberID;
173                                  $self->deleteSubscriberWithID($subscriberID);                                  $self->deleteSubscriberWithID($subscriberID);
174                                  $PersistentConnections{$subscriberID}=$self;                                  $PersistentConnections{$subscriberID}=$self;
175                          }                          }
176                                                    
177                          if(defined($channelName))                          if(scalar(keys %{$channels}))
178                          {                          {
179                                  $self->emitOKHeader();                                  $self->emitOKHeader();
180                                                                    $self->setChannels($channels,$persist);
                                 $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));  
                                   
                                 $self->setChannelName($channelName,$startIndex,$persist);  
                                   
181                                  $self->close(1) unless($persist);                                  $self->close(1) unless($persist);
                                   
182                                  return;                                  return;
183                          }                          }
184                  }                  }
185                    elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
186                    {
187                            $self->deleteSubscriberWithID($1);
188                            $self->emitOKHeader();
189                            $self->close(1);
190                            return;
191                    }
192                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
193                  {                  {
194                          Meteor::Document->serveFileToClient($1,$self);                          Meteor::Document->serveFileToClient($1,$self);
                           
195                          $self->close(1);                          $self->close(1);
                           
196                          return;                          return;
197                  }                  }
198                                    
# Line 223  sub processLine { Line 203  sub processLine {
203          }          }
204  }  }
205    
206  sub setChannelName {  sub setChannels {
207          my $self=shift;          my $self=shift;
208          my $channelName=shift;          my $channels=shift;
         my $startIndex=shift;  
209          my $persist=shift;          my $persist=shift;
210                    
211          my $channel=Meteor::Channel->channelWithName($channelName);          foreach my $channelName (keys %{$channels})
212          $self->{'channel'}=$channel if($persist);          {
213                            my $startIndex=$channels->{$channelName}->{'startIndex'};
214          $channel->addSubscriber($self,$startIndex,$persist);                  
215                    my $channel=Meteor::Channel->channelWithName($channelName);
216                    
217                    $self->{'channels'}->{$channelName}=$channel if($persist);
218                    
219                    $channel->addSubscriber($self,$startIndex,$persist);
220            }
221  }  }
222    
223  sub emitOKHeader {  sub emitOKHeader {
# Line 245  sub emitErrorHeader { Line 230  sub emitErrorHeader {
230          my $self=shift;          my $self=shift;
231                    
232          $self->emitHeader('404 Not Found');          $self->emitHeader('404 Not Found');
233            $::Statistics->{'errors_served'}++;
234                    
235          # close up shop here!          # close up shop here!
236          $self->close();          $self->close();
# Line 286  sub emitHeader { Line 272  sub emitHeader {
272                  }                  }
273          /gex;          /gex;
274                    
275          $self->write($header);          $self->write($header.chr(0));
276  }  }
277    
278  sub sendMessage {  sub sendMessage {
279          my $self=shift;          my $self=shift;
280          my $msg=shift;          my $msg=shift;
281            my $numMsgInThisBatch=shift;
282            
283            $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch));
284            
285            $self->write($msg.chr(0));
286                    
287          $self->write($msg);          $::Statistics->{'messages_served'}+=$numMsgInThisBatch;
288                    
289          my $msgCount=++$self->{'MessageCount'};          my $msgCount=++$self->{'MessageCount'};
290                    
# Line 309  sub sendMessage { Line 300  sub sendMessage {
300          }          }
301  }  }
302    
303    sub closeChannel {
304            my $self=shift;
305            my $channelName=shift;
306            
307            return unless(exists($self->{'channels'}->{$channelName}));
308            
309            my $channel=$self->{'channels'}->{$channelName};
310            $channel->removeSubscriber($self);
311            
312            delete($self->{'channels'}->{$channelName});
313            
314            $self->close() if(scalar(keys %{$self->{'channels'}})==0);
315    }
316    
317  sub close {  sub close {
318          my $self=shift;          my $self=shift;
319          my $noShutdownMsg=shift;          my $noShutdownMsg=shift;
320                    
321          $self->{'channel'}->removeSubscriber($self) if($self->{'channel'});          foreach my $channelName (keys %{$self->{'channels'}})
322          delete($self->{'channel'});          {
323                    my $channel=$self->{'channels'}->{$channelName};
324                    $channel->removeSubscriber($self);
325            }
326            delete($self->{'channels'});
327                    
328          if(exists($self->{'subscriberID'}))          if(exists($self->{'subscriberID'}))
329          {          {
# Line 334  sub close { Line 343  sub close {
343                  }                  }
344          }          }
345                    
346            $::Statistics->{'current_subscribers'}--;
347            
348          $self->SUPER::close();          $self->SUPER::close();
349  }  }
350    

Legend:
Removed from v.3  
changed lines
  Added in v.35

  ViewVC Help
Powered by ViewVC 1.1.26