/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 13 by knops.gerd, Mon Apr 30 18:16:17 2007 UTC revision 37 by knops.gerd, Fri Feb 1 21:22:03 2008 UTC
# Line 43  package Meteor::Subscriber; Line 43  package Meteor::Subscriber;
43          @Meteor::Subscriber::ISA=qw(Meteor::Connection);          @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44                    
45          our %PersistentConnections=();          our %PersistentConnections=();
46            our $NumAcceptedConnections=0;
47        
48    
49  ###############################################################################  ###############################################################################
50  # Factory methods  # Factory methods
# Line 63  sub newFromServer { Line 65  sub newFromServer {
65                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
66          }          }
67                    
68            $::Statistics->{'current_subscribers'}++;
69            $::Statistics->{'subscriber_connections_accepted'}++;
70            
71          $self;          $self;
72  }  }
73    
# Line 85  sub pingPersistentConnections { Line 90  sub pingPersistentConnections {
90          my $msg=$::CONF{'PingMessage'};          my $msg=$::CONF{'PingMessage'};
91          my @cons=values %PersistentConnections;          my @cons=values %PersistentConnections;
92                    
93          map { $_->write($msg) } @cons;          map { $_->write($msg.chr(0)) } @cons;
94  }  }
95    
96  sub checkPersistentConnectionsForMaxTime {  sub checkPersistentConnectionsForMaxTime {
# Line 97  sub checkPersistentConnectionsForMaxTime Line 102  sub checkPersistentConnectionsForMaxTime
102          map { $_->checkForMaxTime($time) } @cons;          map { $_->checkForMaxTime($time) } @cons;
103  }  }
104    
105    sub numSubscribers {
106            
107            return scalar(keys %PersistentConnections);
108    }
109    
110  ###############################################################################  ###############################################################################
111  # Instance methods  # Instance methods
112  ###############################################################################  ###############################################################################
# Line 121  sub processLine { Line 131  sub processLine {
131                  # Analyze header, register with appropiate channel                  # Analyze header, register with appropiate channel
132                  # and send pending messages.                  # and send pending messages.
133                  #                  #
134                  # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1                  # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
135                  #                  #
136                  # Find the 'GET' line                  # Find the 'GET' line
137                  #                  #
138                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/)                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
139                  {                  {
140                          my @formData=split('&',$1);                          my $subscriberID=$1;
141                          my $channelName=undef;                          my $persist=0;
142                          my $startIndex=undef;                          $self->{'mode'}=$2;
143                          my $backtrack=undef;                          if ($self->{'mode'} eq "xhrinteractive" || $self->{'mode'} eq "iframe" || $self->{'mode'} eq "serversent" || $self->{'mode'} eq "longpoll") {
144                          my $persist=1;                                  $persist=1;
145                          my $anyPersist=0;                                  $self->{'MaxMessageCount'}=1 unless(!($self->{'mode'} eq "longpoll"));
146                          my $subscriberID=undef;                          }
147                            if ($self->{'mode'} eq "iframe") {
148                                    $self->{'HeaderTemplateNumber'}=1;
149                            } else {
150                                    $self->{'HeaderTemplateNumber'}=2;
151                            }
152                            my @channelData=split('/',$3);
153                          my $channels={};                          my $channels={};
154                          foreach my $formElement (@formData)                          my $channelName;
155                          {                          my $offset;
156                                  if($formElement=~/^channel=(.+)$/)                          foreach my $chandef (@channelData) {
157                                  {                                  if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {
158                                          if(defined($channelName))                                          $channelName = $1;
159                                          {                                          $channels->{$channelName}->{'startIndex'} = undef;
160                                                  if(defined($startIndex) && defined($backtrack))                                          if ($3) {
161                                                  {                                             $offset = $4;
162                                                          $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");                                             if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
163                                                          $self->close();                                             if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
164                                                                                                       if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
                                                         return;  
                                                 }  
                                                   
                                                 $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));  
                                                 $channels->{$channelName}->{'startIndex'}=$startIndex;  
                                                 $channels->{$channelName}->{'persist'}=$persist;  
                                                 $anyPersist|=$persist;  
                                                   
                                                 $startIndex=undef;  
                                                 $backtrack=undef;  
                                                 $persist=1;  
                                         }  
                                         $channelName=$1;  
                                 }  
                                 elsif($formElement=~/^restartfrom=(\d*)$/)  
                                 {  
                                         $startIndex=$1;  
                                         $startIndex='' unless(defined($startIndex));  
                                 }  
                                 elsif($formElement=~/^backtrack=(\d+)$/)  
                                 {  
                                         $backtrack=$1;  
                                         $backtrack=0 unless(defined($backtrack));  
                                 }  
                                 elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/)  
                                 {  
                                         $persist=0 if($1=~/(no|false|0)/i);  
                                 }  
                                 elsif($formElement=~/^id=(.+)$/)  
                                 {  
                                         $subscriberID=$1;  
                                 }  
                                 elsif($formElement=~/^maxmessages=(\d+)$/i)  
                                 {  
                                         $self->{'MaxMessageCount'}=$1;  
                                 }  
                                 elsif($formElement=~/^template=(\d+)$/i)  
                                 {  
                                         $self->{'HeaderTemplateNumber'}=$1;  
                                 }  
                                 elsif($formElement=~/^maxtime=(\d+)$/i)  
                                 {  
                                         my $clientRequest=$1;  
                                         my $serverDefault=$::CONF{'MaxTime'};  
                                           
                                         if($serverDefault==0 || $serverDefault>$clientRequest)  
                                         {  
                                                 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest;  
165                                          }                                          }
166                                  }                                  }
167                          }                          }
168                                                    
                         if(defined($channelName))  
                         {  
                                 if(defined($startIndex) && defined($backtrack))  
                                 {  
                                         $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");  
                                         $self->close();  
                                           
                                         return;  
                                 }  
                                   
                                 $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));  
                                 $channels->{$channelName}->{'startIndex'}=$startIndex;  
                                 $channels->{$channelName}->{'persist'}=$persist;  
                                 $anyPersist|=$persist;  
                         }  
                           
169                          delete($self->{'headerBuffer'});                          delete($self->{'headerBuffer'});
170                                                    
171                          if(defined($subscriberID) && $anyPersist)                          if($persist)
172                          {                          {
173                                  $self->{'subscriberID'}=$subscriberID;                                  $self->{'subscriberID'}=$subscriberID;
174                                  $self->deleteSubscriberWithID($subscriberID);                                  $self->deleteSubscriberWithID($subscriberID);
# Line 226  sub processLine { Line 178  sub processLine {
178                          if(scalar(keys %{$channels}))                          if(scalar(keys %{$channels}))
179                          {                          {
180                                  $self->emitOKHeader();                                  $self->emitOKHeader();
181                                                                    $self->setChannels($channels,$persist);
182                                  $self->setChannels($channels);                                  $self->close(1) unless($persist);
                                   
                                 $self->close(1) unless($anyPersist);  
                                   
183                                  return;                                  return;
184                          }                          }
185                  }                  }
186                    elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
187                    {
188                            $self->deleteSubscriberWithID($1);
189                            $self->emitOKHeader();
190                            $self->close(1);
191                            return;
192                    }
193                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
194                  {                  {
195                          Meteor::Document->serveFileToClient($1,$self);                          Meteor::Document->serveFileToClient($1,$self);
                           
196                          $self->close(1);                          $self->close(1);
                           
197                          return;                          return;
198                  }                  }
199                                    
# Line 253  sub processLine { Line 207  sub processLine {
207  sub setChannels {  sub setChannels {
208          my $self=shift;          my $self=shift;
209          my $channels=shift;          my $channels=shift;
210            my $persist=shift;
211                    
212          foreach my $channelName (keys %{$channels})          foreach my $channelName (keys %{$channels})
213          {          {
                 my $persist=$channels->{$channelName}->{'persist'};  
214                  my $startIndex=$channels->{$channelName}->{'startIndex'};                  my $startIndex=$channels->{$channelName}->{'startIndex'};
215                                    
216                  my $channel=Meteor::Channel->channelWithName($channelName);                  my $channel=Meteor::Channel->channelWithName($channelName);
# Line 277  sub emitErrorHeader { Line 231  sub emitErrorHeader {
231          my $self=shift;          my $self=shift;
232                    
233          $self->emitHeader('404 Not Found');          $self->emitHeader('404 Not Found');
234            $::Statistics->{'errors_served'}++;
235                    
236          # close up shop here!          # close up shop here!
237          $self->close();          $self->close();
# Line 318  sub emitHeader { Line 273  sub emitHeader {
273                  }                  }
274          /gex;          /gex;
275                    
276          $self->write($header);          $self->write($header.chr(0));
277  }  }
278    
279  sub sendMessage {  sub sendMessage {
280          my $self=shift;          my $self=shift;
281          my $msg=shift;          my $msg=shift;
282            my $numMsgInThisBatch=shift;
283            
284            $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch));
285            
286            $self->write($msg.chr(0));
287                    
288          $self->write($msg);          $::Statistics->{'messages_served'}+=$numMsgInThisBatch;
289                    
290          my $msgCount=++$self->{'MessageCount'};          my $msgCount=++$self->{'MessageCount'};
291                    
# Line 387  sub close { Line 347  sub close {
347          $self->SUPER::close();          $self->SUPER::close();
348  }  }
349    
350    sub didClose {
351            
352            $::Statistics->{'current_subscribers'}--;
353    }
354    
355  sub checkForMaxTime {  sub checkForMaxTime {
356          my $self=shift;          my $self=shift;
357          my $time=shift;          my $time=shift;

Legend:
Removed from v.13  
changed lines
  Added in v.37

  ViewVC Help
Powered by ViewVC 1.1.26