/[meteor]/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

googlecode.com/svn/trunk/Meteor/Subscriber.pm revision 11 by knops.gerd, Thu Dec 14 16:29:42 2006 UTC trunk/Meteor/Subscriber.pm revision 70 by dpavlin, Sat Mar 28 03:45:31 2009 UTC
# Line 39  package Meteor::Subscriber; Line 39  package Meteor::Subscriber;
39          use Meteor::Connection;          use Meteor::Connection;
40          use Meteor::Channel;          use Meteor::Channel;
41          use Meteor::Document;          use Meteor::Document;
42            use Meteor::Koha;
43                    
44          @Meteor::Subscriber::ISA=qw(Meteor::Connection);          @Meteor::Subscriber::ISA=qw(Meteor::Connection);
45                    
46          our %PersistentConnections=();          our %PersistentConnections=();
47            our $NumAcceptedConnections=0;
48        
49    
50  ###############################################################################  ###############################################################################
51  # Factory methods  # Factory methods
# Line 53  sub newFromServer { Line 56  sub newFromServer {
56          my $self=$class->SUPER::newFromServer(shift);          my $self=$class->SUPER::newFromServer(shift);
57                    
58          $self->{'headerBuffer'}='';          $self->{'headerBuffer'}='';
59          $self->{'MessageCount'}=0;          $self->{'messageCount'}=0;
         $self->{'MaxMessageCount'}=0;  
60                    
61          $self->{'ConnectionStart'}=time;          $::Statistics->{'current_subscribers'}++;
62          my $maxTime=$::CONF{'MaxTime'};          $::Statistics->{'subscriber_connections_accepted'}++;
         if($maxTime>0)  
         {  
                 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;  
         }  
63                    
64          $self;          $self;
65  }  }
# Line 75  sub deleteSubscriberWithID { Line 73  sub deleteSubscriberWithID {
73                    
74          if(exists($PersistentConnections{$id}))          if(exists($PersistentConnections{$id}))
75          {          {
76                  $PersistentConnections{$id}->close(1);                  $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
77          }          }
78  }  }
79    
80    sub subscriberExists {
81            my $class=shift;
82            my $id=shift;
83    
84            return exists($PersistentConnections{$id});
85    }
86    
87  sub pingPersistentConnections {  sub pingPersistentConnections {
88          my $class=shift;          my $class=shift;
89                    
         my $msg=$::CONF{'PingMessage'};  
90          my @cons=values %PersistentConnections;          my @cons=values %PersistentConnections;
91                    
92          map { $_->write($msg) } @cons;          map { $_->ping() } @cons;
93  }  }
94    
95  sub checkPersistentConnectionsForMaxTime {  sub checkPersistentConnectionsForMaxTime {
# Line 97  sub checkPersistentConnectionsForMaxTime Line 101  sub checkPersistentConnectionsForMaxTime
101          map { $_->checkForMaxTime($time) } @cons;          map { $_->checkForMaxTime($time) } @cons;
102  }  }
103    
104    sub numSubscribers {
105            
106            return scalar(keys %PersistentConnections);
107    }
108    
109    sub listSubscribers {
110            my $class=shift;
111            my $list='';
112            foreach my $subscriber (keys %PersistentConnections)
113            {
114                    my $sub = $PersistentConnections{$subscriber};
115                    $list .= $subscriber.' '.$sub->{'ip'}.' '.$sub->{'connectionStart'}.' '.$sub->{'connectionTimeLimit'}.' '.($sub->{'connectionTimeLimit'}-time).' '.$sub->{'messageCount'}.' "'.$sub->{'useragent'}."\"$::CRLF";
116            }
117            $list;
118    }
119    
120  ###############################################################################  ###############################################################################
121  # Instance methods  # Instance methods
122  ###############################################################################  ###############################################################################
# Line 104  sub processLine { Line 124  sub processLine {
124          my $self=shift;          my $self=shift;
125          my $line=shift;          my $line=shift;
126                    
127          # Once the header was processed we ignore any input          # Once the header was processed we ignore any input - Meteor does not accept or process request bodies
128          return unless(exists($self->{'headerBuffer'}));          return unless(exists($self->{'headerBuffer'}));
129                    
130          if($line ne '')          if($line ne '')
# Line 121  sub processLine { Line 141  sub processLine {
141                  # Analyze header, register with appropiate channel                  # Analyze header, register with appropiate channel
142                  # and send pending messages.                  # and send pending messages.
143                  #                  #
144                  # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1                  # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
145                  #                  #
146                  # Find the 'GET' line                  # Find the 'GET' line
147                  #                  #
148                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/)                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i)
149                  {                  {
150                          my @formData=split('&',$1);                          $self->{'subscriberID'}=$1;
151                          my $channelName=undef;                          $self->{'mode'}=$2;
152                          my $startIndex=undef;                          my $persist=$self->getConf('Persist');
153                          my $backtrack=undef;                          my $maxTime=$self->getConf('MaxTime');
154                          my $persist=1;                          $self->{'connectionTimeLimit'} = ($self->{'connectionStart'}+$maxTime) if ($maxTime>0);
155                          my $subscriberID=undef;                          
156                          foreach my $formElement (@formData)                          my @channelData=split('/',$3);
157                          {                          my $channels={};
158                                  if($formElement=~/^channel=(.+)$/)                          my $channelName;
159                                  {                          my $offset;
160                                          $channelName=$1;                          foreach my $chandef (@channelData) {
161                                  }                                  if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
162                                  elsif($formElement=~/^restartfrom=(\d*)$/)                                          $channelName = $1;
163                                  {                                          $channels->{$channelName}->{'startIndex'} = undef;
164                                          $startIndex=$1;                                          if ($3) {
165                                          $startIndex='' unless(defined($startIndex));                                             $offset = $4;
166                                  }                                             if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
167                                  elsif($formElement=~/^backtrack=(\d+)$/)                                             if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
168                                  {                                             if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
                                         $backtrack=$1;  
                                         $backtrack=0 unless(defined($backtrack));  
                                 }  
                                 elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/)  
                                 {  
                                         $persist=0 if($1=~/(no|false|0)/i);  
                                 }  
                                 elsif($formElement=~/^id=(.+)$/)  
                                 {  
                                         $subscriberID=$1;  
                                 }  
                                 elsif($formElement=~/^maxmessages=(\d+)$/i)  
                                 {  
                                         $self->{'MaxMessageCount'}=$1;  
                                 }  
                                 elsif($formElement=~/^template=(\d+)$/i)  
                                 {  
                                         $self->{'HeaderTemplateNumber'}=$1;  
                                 }  
                                 elsif($formElement=~/^maxtime=(\d+)$/i)  
                                 {  
                                         my $clientRequest=$1;  
                                         my $serverDefault=$::CONF{'MaxTime'};  
                                           
                                         if($serverDefault==0 || $serverDefault>$clientRequest)  
                                         {  
                                                 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest;  
169                                          }                                          }
170                                  }                                  }
171                          }                          }
172                                                                            $self->{'useragent'} = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
                         delete($self->{'headerBuffer'});  
173                                                    
174                          if(defined($startIndex) && defined($backtrack))                          if ($persist) {
175                          {                                  # New persistent connection: kill any existing connection with same ID
176                                  $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");                                  $self->deleteSubscriberWithID($self->{'subscriberID'});
177                                  $self->close();                                  # Add new persistent connection to collection
178                                                                    $PersistentConnections{$self->{'subscriberID'}}=$self;
179                                  return;                          } else {
180                                    $::Pollers->{$self->{'subscriberID'}} = time;
181                          }                          }
182                                                    
183                          if(defined($subscriberID) && $persist)                          if(scalar(keys %{$channels})) {
184                          {  
185                                  $self->{'subscriberID'}=$subscriberID;                                  $self->{'channelinfo'} = '';
186                                  $self->deleteSubscriberWithID($subscriberID);                                  my $citemplate = $self->getConf('ChannelInfoTemplate');
187                                  $PersistentConnections{$subscriberID}=$self;                                  foreach $channelName (keys %{$channels}) {
188                          }                                          my $channel=Meteor::Channel->channelWithName($channelName);
189                                                                    $self->{'channels'}->{$channelName}=$channel;
190                          if(defined($channelName))                                          if (defined($self->{'channels'}->{$channelName}->{'startIndex'}) && $self->{'channels'}->{$channelName}->{'startIndex'} > 0) {
191                          {                                                  $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
192                                            }
193                                    }
194                                  $self->emitOKHeader();                                  $self->emitOKHeader();
195                                                                    foreach $channelName (keys %{$channels}) {
196                                  $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));                                          my $startIndex=$channels->{$channelName}->{'startIndex'};
197                                                                            $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$self->{'useragent'});
198                                  $self->setChannelName($channelName,$startIndex,$persist);                                  }
199                                                                    if (!$persist) {
200                                  $self->close(1) unless($persist);                                          delete ($self->{'channels'});
201                                                                            $self->close(1, 'responseComplete');
202                                    }
203                                    delete($self->{'headerBuffer'});
204    
205                                    # If long polling, close connection immediately if any messages have been sent
206                                    if ($self->{'messageCount'} > 0 && $self->{'mode'} eq 'longpoll') {
207                                            $self->close(1, 'closedOnEvent');
208                                    }
209                                  return;                                  return;
210                          }                          }
211                  }                  }
212                    elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
213                    {
214                            $self->deleteSubscriberWithID($1);
215                            $self->emitOKHeader();
216                            $self->close(1, 'disconnectRequested');
217                            return;
218                    }
219                    elsif($self->{'headerBuffer'}=~m{GET\s+/koha/(\d+)})
220                    {
221                            Meteor::Koha->item($self,$1);
222                            $self->SUPER::close();
223                            return;
224                    }
225                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
226                  {                  {
227                          Meteor::Document->serveFileToClient($1,$self);                          Meteor::Document->serveFileToClient($1,$self);
228                                                    $self->SUPER::close();
                         $self->close(1);  
                           
229                          return;                          return;
230                  }                  }
231                                    
# Line 223  sub processLine { Line 236  sub processLine {
236          }          }
237  }  }
238    
 sub setChannelName {  
         my $self=shift;  
         my $channelName=shift;  
         my $startIndex=shift;  
         my $persist=shift;  
           
         my $channel=Meteor::Channel->channelWithName($channelName);  
         $self->{'channel'}=$channel if($persist);  
           
         $channel->addSubscriber($self,$startIndex,$persist);  
 }  
   
239  sub emitOKHeader {  sub emitOKHeader {
240          my $self=shift;          my $self=shift;
241                    
# Line 245  sub emitErrorHeader { Line 246  sub emitErrorHeader {
246          my $self=shift;          my $self=shift;
247                    
248          $self->emitHeader('404 Not Found');          $self->emitHeader('404 Not Found');
249            $::Statistics->{'errors_served'}++;
250                    
251          # close up shop here!          # close up shop here!
252          $self->close();          $self->close(0, 'error');
253  }  }
254    
255  sub emitHeader {  sub emitHeader {
256          my $self=shift;          my $self=shift;
257          my $status=shift;          my $status=shift;
258                    
259          my $header=undef;          my $header=$self->getConf('HeaderTemplate');
         if(exists($self->{'HeaderTemplateNumber'}))  
         {  
                 my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};  
                   
                 $header=$::CONF{$hn};  
         }  
         $header=$::CONF{'HeaderTemplate'} unless(defined($header));  
260                    
261          $header=~s/~([^~]+)~/          $header=~s/~([^~]*)~/
262                  if(!defined($1) || $1 eq '')                  if(!defined($1) || $1 eq '') {
                 {  
263                          '~';                          '~';
264                  }                  } elsif($1 eq 'server') {
                 elsif($1 eq 'server')  
                 {  
265                          $::PGM;                          $::PGM;
266                  }                  } elsif($1 eq 'status') {
                 elsif($1 eq 'status')  
                 {  
267                          $status;                          $status;
268                  }                  } elsif($1 eq 'servertime') {
                 elsif($1 eq 'servertime')  
                 {  
269                          time;                          time;
270                  }                  } elsif($1 eq 'channelinfo') {
271                  else                          $self->{'channelinfo'};
272                  {                  } else {
273                          '';                          '';
274                  }                  }
275          /gex;          /gex;
# Line 289  sub emitHeader { Line 277  sub emitHeader {
277          $self->write($header);          $self->write($header);
278  }  }
279    
280  sub sendMessage {  sub sendMessages {
281          my $self=shift;          my $self=shift;
         my $msg=shift;  
           
         $self->write($msg);  
282                    
283          my $msgCount=++$self->{'MessageCount'};          my $numMessages=0;
284            my $msgTemplate=$self->getConf('MessageTemplate');
285            my $msgData='';
286                    
287          my $maxMsg=$::CONF{'MaxMessages'};          foreach my $message (@_)
         if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)  
288          {          {
289                  $self->close(1);                  $msgData.=$message->messageWithTemplate($msgTemplate);
290                    $numMessages++;
291          }          }
292                    
293          if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})          return if($numMessages<1);
294          {          
295                  $self->close(1);          $self->write($msgData);
296            
297            $::Statistics->{'messages_served'}+=$numMessages;
298            
299            my $msgCount=$self->{'messageCount'};
300            $msgCount+=$numMessages;
301            $self->{'messageCount'}=$msgCount;
302            
303            # If long polling, close connection, as a message has now been sent.
304            # Don't close if still processing the header (we may be sending a backlog from multiple channels)
305            if($self->getConf('CloseOnEvent') && !exists($self->{'headerBuffer'})) {
306                    $self->close(1, 'closedOnEvent');
307          }          }
308  }  }
309    
310    sub ping {
311            my $self=shift;
312            my $msg=$self->getConf('PingMessage');
313            
314            $self->write($msg);
315    }
316    
317    sub closeChannel {
318            my $self=shift;
319            my $channelName=shift;
320            
321            return unless(exists($self->{'channels'}->{$channelName}));
322            
323            my $channel=$self->{'channels'}->{$channelName};
324            $channel->removeSubscriber($self,'channelClose');
325            
326            delete($self->{'channels'}->{$channelName});
327            
328            $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
329    }
330    
331  sub close {  sub close {
332          my $self=shift;          my $self=shift;
333          my $noShutdownMsg=shift;          my $noShutdownMsg=shift;
334            my $reason=shift;
335                    
336          $self->{'channel'}->removeSubscriber($self) if($self->{'channel'});          foreach my $channelName (keys %{$self->{'channels'}})
         delete($self->{'channel'});  
           
         if(exists($self->{'subscriberID'}))  
337          {          {
338                    my $channel=$self->{'channels'}->{$channelName};
339                    $channel->removeSubscriber($self,$reason);
340            }
341            delete($self->{'channels'});
342            
343            # If this connection is in the PersistentConnections array, delete it, then anonymise
344            # it so that if we have to wait for the write buffer to empty before close, it's only
345            # removed once.
346            if(exists($self->{'subscriberID'})) {
347                  delete($PersistentConnections{$self->{'subscriberID'}});                  delete($PersistentConnections{$self->{'subscriberID'}});
348                    delete($self->{'subscriberID'});
349          }          }
350                    
         #  
351          # Send shutdown message unless remote closed or          # Send shutdown message unless remote closed or
352          # connection not yet established          # connection not yet established
         #  
353          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
354          {          {
355                  my $msg=$::CONF{'SubscriberShutdownMsg'};                  my $msg=$self->getConf('SubscriberShutdownMsg');
356                  if(defined($msg) && $msg ne '')                  if(defined($msg) && $msg ne '')
357                  {                  {
358                          $self->write($msg);                          $self->write($msg);
359                  }                  }
360          }          }
361            
362            my $fmsg=$self->getConf('FooterTemplate');
363            if(defined($fmsg) && $fmsg ne '')
364            {
365                    $self->write($fmsg);
366            }
367    
368          $self->SUPER::close();          $self->SUPER::close();
369  }  }
370    
371    sub didClose {
372            
373            $::Statistics->{'current_subscribers'}--;
374    }
375    
376  sub checkForMaxTime {  sub checkForMaxTime {
377          my $self=shift;          my $self=shift;
378          my $time=shift;          my $time=shift;
379                    
380          $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);          $self->close(1,'maxTime') if(exists($self->{'connectionTimeLimit'}) && $self->{'connectionTimeLimit'}<$time);
381    }
382    
383    sub getConf {
384            my $self=shift;
385            my $key=shift;
386            
387            if(exists($self->{'mode'}) && $self->{'mode'} ne '') {
388                    my $k=$key.$self->{'mode'};            
389                    if(exists($::CONF{$k})) {
390                            return $::CONF{$k};
391                    }
392            }
393            
394            $::CONF{$key};
395  }  }
396    
397  1;  1;
 ############################################################################EOF  
398    ############################################################################EOF

Legend:
Removed from v.11  
changed lines
  Added in v.70

  ViewVC Help
Powered by ViewVC 1.1.26