/[meteor]/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

googlecode.com/svn/trunk/Meteor/Subscriber.pm revision 53 by andrew.betts, Wed Feb 27 21:58:56 2008 UTC trunk/Meteor/Subscriber.pm revision 70 by dpavlin, Sat Mar 28 03:45:31 2009 UTC
# Line 39  package Meteor::Subscriber; Line 39  package Meteor::Subscriber;
39          use Meteor::Connection;          use Meteor::Connection;
40          use Meteor::Channel;          use Meteor::Channel;
41          use Meteor::Document;          use Meteor::Document;
42            use Meteor::Koha;
43                    
44          @Meteor::Subscriber::ISA=qw(Meteor::Connection);          @Meteor::Subscriber::ISA=qw(Meteor::Connection);
45                    
# Line 55  sub newFromServer { Line 56  sub newFromServer {
56          my $self=$class->SUPER::newFromServer(shift);          my $self=$class->SUPER::newFromServer(shift);
57                    
58          $self->{'headerBuffer'}='';          $self->{'headerBuffer'}='';
59          $self->{'MessageCount'}=0;          $self->{'messageCount'}=0;
         $self->{'MaxMessageCount'}=0;  
           
         $self->{'ConnectionStart'}=time;  
         my $maxTime=$::CONF{'MaxTime'};  
         if($maxTime>0)  
         {  
                 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;  
         }  
60                    
61          $::Statistics->{'current_subscribers'}++;          $::Statistics->{'current_subscribers'}++;
62          $::Statistics->{'subscriber_connections_accepted'}++;          $::Statistics->{'subscriber_connections_accepted'}++;
# Line 84  sub deleteSubscriberWithID { Line 77  sub deleteSubscriberWithID {
77          }          }
78  }  }
79    
80    sub subscriberExists {
81            my $class=shift;
82            my $id=shift;
83    
84            return exists($PersistentConnections{$id});
85    }
86    
87  sub pingPersistentConnections {  sub pingPersistentConnections {
88          my $class=shift;          my $class=shift;
89                    
# Line 106  sub numSubscribers { Line 106  sub numSubscribers {
106          return scalar(keys %PersistentConnections);          return scalar(keys %PersistentConnections);
107  }  }
108    
109    sub listSubscribers {
110            my $class=shift;
111            my $list='';
112            foreach my $subscriber (keys %PersistentConnections)
113            {
114                    my $sub = $PersistentConnections{$subscriber};
115                    $list .= $subscriber.' '.$sub->{'ip'}.' '.$sub->{'connectionStart'}.' '.$sub->{'connectionTimeLimit'}.' '.($sub->{'connectionTimeLimit'}-time).' '.$sub->{'messageCount'}.' "'.$sub->{'useragent'}."\"$::CRLF";
116            }
117            $list;
118    }
119    
120  ###############################################################################  ###############################################################################
121  # Instance methods  # Instance methods
122  ###############################################################################  ###############################################################################
# Line 113  sub processLine { Line 124  sub processLine {
124          my $self=shift;          my $self=shift;
125          my $line=shift;          my $line=shift;
126                    
127          # Once the header was processed we ignore any input          # Once the header was processed we ignore any input - Meteor does not accept or process request bodies
128          return unless(exists($self->{'headerBuffer'}));          return unless(exists($self->{'headerBuffer'}));
129                    
130          if($line ne '')          if($line ne '')
# Line 134  sub processLine { Line 145  sub processLine {
145                  #                  #
146                  # Find the 'GET' line                  # Find the 'GET' line
147                  #                  #
148                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i)
149                  {                  {
150                          $self->{'subscriberID'}=$1;                          $self->{'subscriberID'}=$1;
151                          $self->{'mode'}=$2;                          $self->{'mode'}=$2;
152                          my $persist=$self->getConf('Persist');                          my $persist=$self->getConf('Persist');
153                          my $maxTime=$self->getConf('MaxTime');                          my $maxTime=$self->getConf('MaxTime');
154                          $self->{'ConnectionTimeLimit'} = ($self->{'ConnectionStart'}+$maxTime) if ($maxTime>0);                          $self->{'connectionTimeLimit'} = ($self->{'connectionStart'}+$maxTime) if ($maxTime>0);
155                                                    
156                          my @channelData=split('/',$3);                          my @channelData=split('/',$3);
157                          my $channels={};                          my $channels={};
# Line 158  sub processLine { Line 169  sub processLine {
169                                          }                                          }
170                                  }                                  }
171                          }                          }
172                          my $useragent = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";                          $self->{'useragent'} = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
                           
                         delete($self->{'headerBuffer'});  
173                                                    
174                          if ($persist) {                          if ($persist) {
175                                    # New persistent connection: kill any existing connection with same ID
176                                  $self->deleteSubscriberWithID($self->{'subscriberID'});                                  $self->deleteSubscriberWithID($self->{'subscriberID'});
177                                    # Add new persistent connection to collection
178                                  $PersistentConnections{$self->{'subscriberID'}}=$self;                                  $PersistentConnections{$self->{'subscriberID'}}=$self;
179                            } else {
180                                    $::Pollers->{$self->{'subscriberID'}} = time;
181                          }                          }
182                                                    
183                          if(scalar(keys %{$channels})) {                          if(scalar(keys %{$channels})) {
# Line 174  sub processLine { Line 187  sub processLine {
187                                  foreach $channelName (keys %{$channels}) {                                  foreach $channelName (keys %{$channels}) {
188                                          my $channel=Meteor::Channel->channelWithName($channelName);                                          my $channel=Meteor::Channel->channelWithName($channelName);
189                                          $self->{'channels'}->{$channelName}=$channel;                                          $self->{'channels'}->{$channelName}=$channel;
190                                          $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);                                          if (defined($self->{'channels'}->{$channelName}->{'startIndex'}) && $self->{'channels'}->{$channelName}->{'startIndex'} > 0) {
191                                                                                            $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
192                                            }
193                                  }                                  }
194                                  $self->emitOKHeader();                                  $self->emitOKHeader();
195                                  foreach $channelName (keys %{$channels}) {                                  foreach $channelName (keys %{$channels}) {
196                                          my $startIndex=$channels->{$channelName}->{'startIndex'};                                          my $startIndex=$channels->{$channelName}->{'startIndex'};
197                                          $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$useragent);                                          $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$self->{'useragent'});
198                                    }
199                                    if (!$persist) {
200                                            delete ($self->{'channels'});
201                                            $self->close(1, 'responseComplete');
202                                    }
203                                    delete($self->{'headerBuffer'});
204    
205                                    # If long polling, close connection immediately if any messages have been sent
206                                    if ($self->{'messageCount'} > 0 && $self->{'mode'} eq 'longpoll') {
207                                            $self->close(1, 'closedOnEvent');
208                                  }                                  }
                                 delete ($self->{'channels'}) unless($persist);  
                                 $self->close(1, 'responseComplete') unless($persist);  
209                                  return;                                  return;
210                          }                          }
211                  }                  }
# Line 194  sub processLine { Line 216  sub processLine {
216                          $self->close(1, 'disconnectRequested');                          $self->close(1, 'disconnectRequested');
217                          return;                          return;
218                  }                  }
219                    elsif($self->{'headerBuffer'}=~m{GET\s+/koha/(\d+)})
220                    {
221                            Meteor::Koha->item($self,$1);
222                            $self->SUPER::close();
223                            return;
224                    }
225                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
226                  {                  {
227                          Meteor::Document->serveFileToClient($1,$self);                          Meteor::Document->serveFileToClient($1,$self);
228                          $self->close(1, 'responseComplete');                          $self->SUPER::close();
229                          return;                          return;
230                  }                  }
231                                    
# Line 268  sub sendMessages { Line 296  sub sendMessages {
296                    
297          $::Statistics->{'messages_served'}+=$numMessages;          $::Statistics->{'messages_served'}+=$numMessages;
298                    
299          my $msgCount=$self->{'MessageCount'};          my $msgCount=$self->{'messageCount'};
300          $msgCount+=$numMessages;          $msgCount+=$numMessages;
301          $self->{'MessageCount'}=$msgCount;          $self->{'messageCount'}=$msgCount;
302                    
303          my $maxMsg=$self->getConf('MaxMessages');          # If long polling, close connection, as a message has now been sent.
304          if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)          # Don't close if still processing the header (we may be sending a backlog from multiple channels)
305          {          if($self->getConf('CloseOnEvent') && !exists($self->{'headerBuffer'})) {
306                  $self->close(1, 'maxMessageCountReached');                  $self->close(1, 'closedOnEvent');
         }  
           
         if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})  
         {  
                 $self->close(1, 'maxMessageCountReached');  
307          }          }
           
308  }  }
309    
310  sub ping {  sub ping {
# Line 336  sub close { Line 358  sub close {
358                          $self->write($msg);                          $self->write($msg);
359                  }                  }
360          }          }
361            
362            my $fmsg=$self->getConf('FooterTemplate');
363            if(defined($fmsg) && $fmsg ne '')
364            {
365                    $self->write($fmsg);
366            }
367    
368          $self->SUPER::close();          $self->SUPER::close();
369  }  }
370    
# Line 349  sub checkForMaxTime { Line 377  sub checkForMaxTime {
377          my $self=shift;          my $self=shift;
378          my $time=shift;          my $time=shift;
379                    
380          $self->close(1,'maxTime') if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);          $self->close(1,'maxTime') if(exists($self->{'connectionTimeLimit'}) && $self->{'connectionTimeLimit'}<$time);
381  }  }
382    
383  sub getConf {  sub getConf {
384          my $self=shift;          my $self=shift;
385          my $key=shift;          my $key=shift;
386                    
387          if(exists($self->{'mode'}) && $self->{'mode'} ne '')          if(exists($self->{'mode'}) && $self->{'mode'} ne '') {
388          {                  my $k=$key.$self->{'mode'};            
                 my $k=$key.$self->{'mode'};  
                   
389                  if(exists($::CONF{$k})) {                  if(exists($::CONF{$k})) {
390                          return $::CONF{$k};                          return $::CONF{$k};
391                  }                  }
# Line 369  sub getConf { Line 395  sub getConf {
395  }  }
396    
397  1;  1;
 ############################################################################EOF  
398    ############################################################################EOF

Legend:
Removed from v.53  
changed lines
  Added in v.70

  ViewVC Help
Powered by ViewVC 1.1.26