/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 3 by andrew.betts, Mon Nov 20 17:59:30 2006 UTC revision 25 by knops.gerd, Sun May 20 19:40:53 2007 UTC
# Line 43  package Meteor::Subscriber; Line 43  package Meteor::Subscriber;
43          @Meteor::Subscriber::ISA=qw(Meteor::Connection);          @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44                    
45          our %PersistentConnections=();          our %PersistentConnections=();
46            our $NumAcceptedConnections=0;
47    
48  ###############################################################################  ###############################################################################
49  # Factory methods  # Factory methods
# Line 63  sub newFromServer { Line 64  sub newFromServer {
64                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
65          }          }
66                    
67            $::Statistics->{'current_subscribers'}++;
68            $::Statistics->{'subscriber_connections_accepted'}++;
69            
70          $self;          $self;
71  }  }
72    
# Line 97  sub checkPersistentConnectionsForMaxTime Line 101  sub checkPersistentConnectionsForMaxTime
101          map { $_->checkForMaxTime($time) } @cons;          map { $_->checkForMaxTime($time) } @cons;
102  }  }
103    
104    sub numSubscribers {
105            
106            return scalar(keys %PersistentConnections);
107    }
108    
109  ###############################################################################  ###############################################################################
110  # Instance methods  # Instance methods
111  ###############################################################################  ###############################################################################
# Line 133  sub processLine { Line 142  sub processLine {
142                          my $backtrack=undef;                          my $backtrack=undef;
143                          my $persist=1;                          my $persist=1;
144                          my $subscriberID=undef;                          my $subscriberID=undef;
145                            my $channels={};
146                          foreach my $formElement (@formData)                          foreach my $formElement (@formData)
147                          {                          {
148                                  if($formElement=~/^channel=(.+)$/)                                  if($formElement=~/^channel=(.+)$/)
149                                  {                                  {
150                                            if(defined($channelName))
151                                            {
152                                                    if(defined($startIndex) && defined($backtrack))
153                                                    {
154                                                            $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");
155                                                            $self->close();
156                                                            
157                                                            return;
158                                                    }
159                                                    
160                                                    $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));
161                                                    $channels->{$channelName}->{'startIndex'}=$startIndex;
162                                                    
163                                                    $startIndex=undef;
164                                                    $backtrack=undef;
165                                            }
166                                          $channelName=$1;                                          $channelName=$1;
167                                  }                                  }
168                                  elsif($formElement=~/^restartfrom=(\d*)$/)                                  elsif($formElement=~/^restartfrom=(\d*)$/)
# Line 176  sub processLine { Line 202  sub processLine {
202                                          }                                          }
203                                  }                                  }
204                          }                          }
                                                   
                         delete($self->{'headerBuffer'});  
205                                                    
206                          if(defined($startIndex) && defined($backtrack))                          if(defined($channelName))
207                          {                          {
208                                  $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");                                  if(defined($startIndex) && defined($backtrack))
209                                  $self->close();                                  {
210                                            $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");
211                                            $self->close();
212                                            
213                                            return;
214                                    }
215                                                                    
216                                  return;                                  $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));
217                                    $channels->{$channelName}->{'startIndex'}=$startIndex;
218                          }                          }
219                                                    
220                            delete($self->{'headerBuffer'});
221                            
222                          if(defined($subscriberID) && $persist)                          if(defined($subscriberID) && $persist)
223                          {                          {
224                                  $self->{'subscriberID'}=$subscriberID;                                  $self->{'subscriberID'}=$subscriberID;
# Line 194  sub processLine { Line 226  sub processLine {
226                                  $PersistentConnections{$subscriberID}=$self;                                  $PersistentConnections{$subscriberID}=$self;
227                          }                          }
228                                                    
229                          if(defined($channelName))                          if(scalar(keys %{$channels}))
230                          {                          {
231                                  $self->emitOKHeader();                                  $self->emitOKHeader();
232                                                                    
233                                  $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));                                  $self->setChannels($channels,$persist);
                                   
                                 $self->setChannelName($channelName,$startIndex,$persist);  
234                                                                    
235                                  $self->close(1) unless($persist);                                  $self->close(1) unless($persist);
236                                                                    
# Line 223  sub processLine { Line 253  sub processLine {
253          }          }
254  }  }
255    
256  sub setChannelName {  sub setChannels {
257          my $self=shift;          my $self=shift;
258          my $channelName=shift;          my $channels=shift;
         my $startIndex=shift;  
259          my $persist=shift;          my $persist=shift;
260                    
261          my $channel=Meteor::Channel->channelWithName($channelName);          foreach my $channelName (keys %{$channels})
262          $self->{'channel'}=$channel if($persist);          {
263                            my $startIndex=$channels->{$channelName}->{'startIndex'};
264          $channel->addSubscriber($self,$startIndex,$persist);                  
265                    my $channel=Meteor::Channel->channelWithName($channelName);
266                    
267                    $self->{'channels'}->{$channelName}=$channel if($persist);
268                    
269                    $channel->addSubscriber($self,$startIndex,$persist);
270            }
271  }  }
272    
273  sub emitOKHeader {  sub emitOKHeader {
# Line 245  sub emitErrorHeader { Line 280  sub emitErrorHeader {
280          my $self=shift;          my $self=shift;
281                    
282          $self->emitHeader('404 Not Found');          $self->emitHeader('404 Not Found');
283            $::Statistics->{'errors_served'}++;
284                    
285          # close up shop here!          # close up shop here!
286          $self->close();          $self->close();
# Line 292  sub emitHeader { Line 328  sub emitHeader {
328  sub sendMessage {  sub sendMessage {
329          my $self=shift;          my $self=shift;
330          my $msg=shift;          my $msg=shift;
331            my $numMsgInThisBatch=shift;
332            
333            $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch));
334                    
335          $self->write($msg);          $self->write($msg);
336                    
337            $::Statistics->{'messages_served'}+=$numMsgInThisBatch;
338            
339          my $msgCount=++$self->{'MessageCount'};          my $msgCount=++$self->{'MessageCount'};
340                    
341          my $maxMsg=$::CONF{'MaxMessages'};          my $maxMsg=$::CONF{'MaxMessages'};
# Line 309  sub sendMessage { Line 350  sub sendMessage {
350          }          }
351  }  }
352    
353    sub closeChannel {
354            my $self=shift;
355            my $channelName=shift;
356            
357            return unless(exists($self->{'channels'}->{$channelName}));
358            
359            my $channel=$self->{'channels'}->{$channelName};
360            $channel->removeSubscriber($self);
361            
362            delete($self->{'channels'}->{$channelName});
363            
364            $self->close() if(scalar(keys %{$self->{'channels'}})==0);
365    }
366    
367  sub close {  sub close {
368          my $self=shift;          my $self=shift;
369          my $noShutdownMsg=shift;          my $noShutdownMsg=shift;
370                    
371          $self->{'channel'}->removeSubscriber($self) if($self->{'channel'});          foreach my $channelName (keys %{$self->{'channels'}})
372          delete($self->{'channel'});          {
373                    my $channel=$self->{'channels'}->{$channelName};
374                    $channel->removeSubscriber($self);
375            }
376            delete($self->{'channels'});
377                    
378          if(exists($self->{'subscriberID'}))          if(exists($self->{'subscriberID'}))
379          {          {
# Line 334  sub close { Line 393  sub close {
393                  }                  }
394          }          }
395                    
396            $::Statistics->{'current_subscribers'}--;
397            
398          $self->SUPER::close();          $self->SUPER::close();
399  }  }
400    

Legend:
Removed from v.3  
changed lines
  Added in v.25

  ViewVC Help
Powered by ViewVC 1.1.26