/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 9 by andrew.betts, Fri Dec 8 16:52:58 2006 UTC revision 62 by andrew.betts, Thu Nov 27 00:33:21 2008 UTC
# Line 1  Line 1 
1  #!/usr/bin/perl -w  #!/usr/bin/perl -w
2  ###############################################################################  ###############################################################################
3  #   Meteor  #   Meteor
4  #   An HTTP server for the 2.0 web  #   An HTTP server for the 2.0 web
5  #   Copyright (c) 2006 contributing authors  #   Copyright (c) 2006 contributing authors
6  #  #
7  #   Subscriber.pm  #   Subscriber.pm
8  #  #
9  #       Description:  #       Description:
10  #       A Meteor Subscriber  #       A Meteor Subscriber
11  #  #
12  ###############################################################################  ###############################################################################
13  #  #
14  #   This program is free software; you can redistribute it and/or modify it  #   This program is free software; you can redistribute it and/or modify it
15  #   under the terms of the GNU General Public License as published by the Free  #   under the terms of the GNU General Public License as published by the Free
16  #   Software Foundation; either version 2 of the License, or (at your option)  #   Software Foundation; either version 2 of the License, or (at your option)
17  #   any later version.  #   any later version.
18  #  #
19  #   This program is distributed in the hope that it will be useful, but WITHOUT  #   This program is distributed in the hope that it will be useful, but WITHOUT
20  #   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or  #   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  #   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for  #   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22  #   more details.  #   more details.
23  #  #
24  #   You should have received a copy of the GNU General Public License along  #   You should have received a copy of the GNU General Public License along
25  #   with this program; if not, write to the Free Software Foundation, Inc.,  #   with this program; if not, write to the Free Software Foundation, Inc.,
26  #   59 Temple Place, Suite 330, Boston, MA 02111-1307 USA  #   59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27  #  #
28  #   For more information visit www.meteorserver.org  #   For more information visit www.meteorserver.org
29  #  #
30  ###############################################################################  ###############################################################################
31    
32  package Meteor::Subscriber;  package Meteor::Subscriber;
33  ###############################################################################  ###############################################################################
34  # Configuration  # Configuration
35  ###############################################################################  ###############################################################################
36                    
37          use strict;          use strict;
38                    
39          use Meteor::Connection;          use Meteor::Connection;
40          use Meteor::Channel;          use Meteor::Channel;
41          use Meteor::Document;          use Meteor::Document;
42                    
43          @Meteor::Subscriber::ISA=qw(Meteor::Connection);          @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44                    
45          our %PersistentConnections=();          our %PersistentConnections=();
46            our $NumAcceptedConnections=0;
47  ###############################################################################      
48  # Factory methods  
49  ###############################################################################  ###############################################################################
50  sub newFromServer {  # Factory methods
51          my $class=shift;  ###############################################################################
52            sub newFromServer {
53          my $self=$class->SUPER::newFromServer(shift);          my $class=shift;
54                    
55          $self->{'headerBuffer'}='';          my $self=$class->SUPER::newFromServer(shift);
56          $self->{'MessageCount'}=0;          
57          $self->{'MaxMessageCount'}=0;          $self->{'headerBuffer'}='';
58                    $self->{'messageCount'}=0;
59          $self->{'ConnectionStart'}=time;          
60          my $maxTime=$::CONF{'MaxTime'};          my $maxTime=$::CONF{'MaxTime'};
61          if($maxTime>0)          if($maxTime>0)
62          {          {
63                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;                  $self->{'connectionTimeLimit'}=$self->{'connectionStart'}+$maxTime;
64          }          }
65                    
66          $self;          $::Statistics->{'current_subscribers'}++;
67  }          $::Statistics->{'subscriber_connections_accepted'}++;
68            
69  ###############################################################################          $self;
70  # Class methods  }
71  ###############################################################################  
72  sub deleteSubscriberWithID {  ###############################################################################
73          my $class=shift;  # Class methods
74          my $id=shift;  ###############################################################################
75            sub deleteSubscriberWithID {
76          if(exists($PersistentConnections{$id}))          my $class=shift;
77          {          my $id=shift;
78                  $PersistentConnections{$id}->close(1);          
79          }          if(exists($PersistentConnections{$id}))
80  }          {
81                    $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
82  sub pingPersistentConnections {          }
83          my $class=shift;  }
84            
85          my $msg=$::CONF{'PingMessage'};  sub subscriberExists {
86          my @cons=values %PersistentConnections;          my $class=shift;
87                    my $id=shift;
88          map { $_->write($msg) } @cons;  
89  }          return exists($PersistentConnections{$id});
90    }
91  sub checkPersistentConnectionsForMaxTime {  
92          my $class=shift;  sub pingPersistentConnections {
93                    my $class=shift;
94          my $time=time;          
95          my @cons=values %PersistentConnections;          my @cons=values %PersistentConnections;
96                    
97          map { $_->checkForMaxTime($time) } @cons;          map { $_->ping() } @cons;
98  }  }
99    
100  ###############################################################################  sub checkPersistentConnectionsForMaxTime {
101  # Instance methods          my $class=shift;
102  ###############################################################################          
103  sub processLine {          my $time=time;
104          my $self=shift;          my @cons=values %PersistentConnections;
105          my $line=shift;          
106                    map { $_->checkForMaxTime($time) } @cons;
107          # Once the header was processed we ignore any input  }
108          return unless(exists($self->{'headerBuffer'}));  
109            sub numSubscribers {
110          if($line ne '')          
111          {          return scalar(keys %PersistentConnections);
112                  #  }
113                  # Accumulate header  
114                  #  sub listSubscribers {
115                  $self->{'headerBuffer'}.="$line\n";          my $class=shift;
116          }          my $list='';
117          else          foreach my $subscriber (keys %PersistentConnections)
118          {          {
119                  #                  my $sub = $PersistentConnections{$subscriber};
120                  # Empty line signals end of header.                  $list .= $subscriber.' '.$sub->{'ip'}.' '.$sub->{'connectionStart'}.' '.$sub->{'connectionTimeLimit'}.' '.($sub->{'connectionTimeLimit'}-time).' '.$sub->{'messageCount'}.' "'.$sub->{'useragent'}."\"$::CRLF";
121                  # Analyze header, register with appropiate channel          }
122                  # and send pending messages.          $list;
123                  #  }
124                  # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1  
125                  #  ###############################################################################
126                  # Find the 'GET' line  # Instance methods
127                  #  ###############################################################################
128                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/)  sub processLine {
129                  {          my $self=shift;
130                          my @formData=split('&',$1);          my $line=shift;
131                          my $channelName=undef;          
132                          my $startIndex=undef;          # Once the header was processed we ignore any input - Meteor does not accept or process request bodies
133                          my $backtrack=undef;          return unless(exists($self->{'headerBuffer'}));
134                          my $persist=1;          
135                          my $subscriberID=undef;          if($line ne '')
136                          foreach my $formElement (@formData)          {
137                          {                  #
138                                  if($formElement=~/^channel=(.+)$/)                  # Accumulate header
139                                  {                  #
140                                          $channelName=$1;                  $self->{'headerBuffer'}.="$line\n";
141                                  }          }
142                                  elsif($formElement=~/^restartfrom=(\d*)$/)          else
143                                  {          {
144                                          $startIndex=$1;                  #
145                                          $startIndex='' unless(defined($startIndex));                  # Empty line signals end of header.
146                                  }                  # Analyze header, register with appropiate channel
147                                  elsif($formElement=~/^backtrack=(\d+)$/)                  # and send pending messages.
148                                  {                  #
149                                          $backtrack=$1;                  # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
150                                          $backtrack=0 unless(defined($backtrack));                  #
151                                  }                  # Find the 'GET' line
152                                  elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/)                  #
153                                  {                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i)
154                                          $persist=0 if($1=~/(no|false|0)/i);                  {
155                                  }                          $self->{'subscriberID'}=$1;
156                                  elsif($formElement=~/^id=(.+)$/)                          $self->{'mode'}=$2;
157                                  {                          my $persist=$self->getConf('Persist');
158                                          $subscriberID=$1;                          my $maxTime=$self->getConf('MaxTime');
159                                  }                          $self->{'connectionTimeLimit'} = ($self->{'connectionStart'}+$maxTime) if ($maxTime>0);
160                                  elsif($formElement=~/^maxmessages=(\d+)$/i)                          
161                                  {                          my @channelData=split('/',$3);
162                                          $self->{'MaxMessageCount'}=$1;                          my $channels={};
163                                  }                          my $channelName;
164                                  elsif($formElement=~/^template=(\d+)$/i)                          my $offset;
165                                  {                          foreach my $chandef (@channelData) {
166                                          $self->{'HeaderTemplateNumber'}=$1;                                  if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
167                                  }                                          $channelName = $1;
168                                  elsif($formElement=~/^maxtime=(\d+)$/i)                                          $channels->{$channelName}->{'startIndex'} = undef;
169                                  {                                          if ($3) {
170                                          my $clientRequest=$1;                                             $offset = $4;
171                                          my $serverDefault=$::CONF{'MaxTime'};                                             if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
172                                                                                       if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
173                                          if($serverDefault==0 || $serverDefault>$clientRequest)                                             if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
174                                          {                                          }
175                                                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest;                                  }
176                                          }                          }
177                                  }                          $self->{'useragent'} = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
178                          }                          
179                                                                            if ($persist) {
180                          delete($self->{'headerBuffer'});                                  # New persistent connection: kill any existing connection with same ID
181                                                            $self->deleteSubscriberWithID($self->{'subscriberID'});
182                          if(defined($startIndex) && defined($backtrack))                                  # Add new persistent connection to collection
183                          {                                  $PersistentConnections{$self->{'subscriberID'}}=$self;
184                                  $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");                          } else {
185                                  $self->close();                                  $::Pollers->{$self->{'subscriberID'}} = time;
186                                                            }
187                                  return;                          
188                          }                          if(scalar(keys %{$channels})) {
189                            
190                          if(defined($subscriberID) && $persist)                                  $self->{'channelinfo'} = '';
191                          {                                  my $citemplate = $self->getConf('ChannelInfoTemplate');
192                                  $self->{'subscriberID'}=$subscriberID;                                  foreach $channelName (keys %{$channels}) {
193                                  $self->deleteSubscriberWithID($subscriberID);                                          my $channel=Meteor::Channel->channelWithName($channelName);
194                                  $PersistentConnections{$subscriberID}=$self;                                          $self->{'channels'}->{$channelName}=$channel;
195                          }                                          if (defined($self->{'channels'}->{$channelName}->{'startIndex'}) && $self->{'channels'}->{$channelName}->{'startIndex'} > 0) {
196                                                                            $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
197                          if(defined($channelName))                                          }
198                          {                                  }
199                                  $self->emitOKHeader();                                  $self->emitOKHeader();
200                                                                    foreach $channelName (keys %{$channels}) {
201                                  $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));                                          my $startIndex=$channels->{$channelName}->{'startIndex'};
202                                                                            $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$self->{'useragent'});
203                                  $self->setChannelName($channelName,$startIndex,$persist);                                  }
204                                                                    delete ($self->{'channels'}) unless($persist);
205                                  $self->close(1) unless($persist);                                  $self->close(1, 'responseComplete') unless($persist);
206                                                                    $self->close(1, 'closedOnEvent') unless($self->{'messageCount'} == 0);
207                                  return;                                  delete($self->{'headerBuffer'});
208                          }                                  return;
209                  }                          }
210                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)                  }
211                  {                  elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
212                          Meteor::Document->serveFileToClient($1,$self);                  {
213                                                    $self->deleteSubscriberWithID($1);
214                          $self->close(1);                          $self->emitOKHeader();
215                                                    $self->close(1, 'disconnectRequested');
216                          return;                          return;
217                  }                  }
218                                    elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
219                  #                  {
220                  # If we fall through we did not understand the request                          Meteor::Document->serveFileToClient($1,$self);
221                  #                          $self->close(1, 'responseComplete');
222                  $self->emitErrorHeader();                          return;
223          }                  }
224  }                  
225                    #
226  sub setChannelName {                  # If we fall through we did not understand the request
227          my $self=shift;                  #
228          my $channelName=shift;                  $self->emitErrorHeader();
229          my $startIndex=shift;          }
230          my $persist=shift;  }
231            
232          my $channel=Meteor::Channel->channelWithName($channelName);  sub emitOKHeader {
233          $self->{'channel'}=$channel if($persist);          my $self=shift;
234                    
235          $channel->addSubscriber($self,$startIndex,$persist);          $self->emitHeader('200 OK');
236  }  }
237    
238  sub emitOKHeader {  sub emitErrorHeader {
239          my $self=shift;          my $self=shift;
240                    
241          $self->emitHeader('200 OK');          $self->emitHeader('404 Not Found');
242  }          $::Statistics->{'errors_served'}++;
243            
244  sub emitErrorHeader {          # close up shop here!
245          my $self=shift;          $self->close(0, 'error');
246            }
247          $self->emitHeader('404 Not Found');  
248            sub emitHeader {
249          # close up shop here!          my $self=shift;
250          $self->close();          my $status=shift;
251  }          
252            my $header=$self->getConf('HeaderTemplate');
253  sub emitHeader {          
254          my $self=shift;          $header=~s/~([^~]*)~/
255          my $status=shift;                  if(!defined($1) || $1 eq '') {
256                                    '~';
257          my $header=undef;                  } elsif($1 eq 'server') {
258          if(exists($self->{'HeaderTemplateNumber'}))                          $::PGM;
259          {                  } elsif($1 eq 'status') {
260                  my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};                          $status;
261                                    } elsif($1 eq 'servertime') {
262                  $header=$::CONF{$hn};                          time;
263          }                  } elsif($1 eq 'channelinfo') {
264          $header=$::CONF{'HeaderTemplate'} unless(defined($header));                          $self->{'channelinfo'};
265                            } else {
266          $header=~s/~([^~]+)~/                          '';
267                  if(!defined($1) || $1 eq '')                  }
268                  {          /gex;
269                          '~';          
270                  }          $self->write($header);
271                  elsif($1 eq 'server')  }
272                  {  
273                          $::PGM;  sub sendMessages {
274                  }          my $self=shift;
275                  elsif($1 eq 'status')          
276                  {          my $numMessages=0;
277                          $status;          my $msgTemplate=$self->getConf('MessageTemplate');
278                  }          my $msgData='';
279                  elsif($1 eq 'servertime')          
280                  {          foreach my $message (@_)
281                          time;          {
282                  }                  $msgData.=$message->messageWithTemplate($msgTemplate);
283                  else                  $numMessages++;
284                  {          }
285                          '';          
286                  }          return if($numMessages<1);
287          /gex;          
288                    $self->write($msgData);
289          $self->write($header);          
290  }          $::Statistics->{'messages_served'}+=$numMessages;
291            
292  sub sendMessage {          my $msgCount=$self->{'messageCount'};
293          my $self=shift;          $msgCount+=$numMessages;
294          my $msg=shift;          $self->{'messageCount'}=$msgCount;
295                    
296          $self->write($msg);          # If long polling, close connection, as a message has now been sent.
297                    # Don't close if still processing the header (we may be sending a backlog from multiple channels)
298          my $msgCount=++$self->{'MessageCount'};          if($self->getConf('CloseOnEvent') && !exists($self->{'headerBuffer'})) {
299                            $self->close(1, 'closedOnEvent');
300          my $maxMsg=$::CONF{'MaxMessages'};          }
301          if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)  }
302          {  
303                  $self->close(1);  sub ping {
304          }          my $self=shift;
305                    my $msg=$self->getConf('PingMessage');
306          if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})          
307          {          $self->write($msg);
308                  $self->close(1);  }
309          }  
310  }  sub closeChannel {
311            my $self=shift;
312  sub close {          my $channelName=shift;
313          my $self=shift;          
314          my $noShutdownMsg=shift;          return unless(exists($self->{'channels'}->{$channelName}));
315                    
316          $self->{'channel'}->removeSubscriber($self) if($self->{'channel'});          my $channel=$self->{'channels'}->{$channelName};
317          delete($self->{'channel'});          $channel->removeSubscriber($self,'channelClose');
318                    
319          if(exists($self->{'subscriberID'}))          delete($self->{'channels'}->{$channelName});
320          {          
321                  delete($PersistentConnections{$self->{'subscriberID'}});          $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
322          }  }
323            
324          #  sub close {
325          # Send shutdown message unless remote closed or          my $self=shift;
326          # connection not yet established          my $noShutdownMsg=shift;
327          #          my $reason=shift;
328          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))          
329          {          foreach my $channelName (keys %{$self->{'channels'}})
330                  my $msg=$::CONF{'SubscriberShutdownMsg'};          {
331                  if(defined($msg) && $msg ne '')                  my $channel=$self->{'channels'}->{$channelName};
332                  {                  $channel->removeSubscriber($self,$reason);
333                          $self->write($msg);          }
334                  }          delete($self->{'channels'});
335          }          
336                    # If this connection is in the PersistentConnections array, delete it, then anonymise
337          $self->SUPER::close();          # it so that if we have to wait for the write buffer to empty before close, it's only
338  }          # removed once.
339            if(exists($self->{'subscriberID'})) {
340  sub checkForMaxTime {                  delete($PersistentConnections{$self->{'subscriberID'}});
341          my $self=shift;                  delete($self->{'subscriberID'});
342          my $time=shift;          }
343                    
344          $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);          # Send shutdown message unless remote closed or
345  }          # connection not yet established
346            unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
347  1;          {
348                    my $msg=$self->getConf('SubscriberShutdownMsg');
349                    if(defined($msg) && $msg ne '')
350                    {
351                            $self->write($msg);
352                    }
353            }
354    
355            my $fmsg=$self->getConf('FooterTemplate');
356            if(defined($fmsg) && $fmsg ne '')
357            {
358                    $self->write($fmsg);
359            }
360    
361            $self->SUPER::close();
362    }
363    
364    sub didClose {
365            
366            $::Statistics->{'current_subscribers'}--;
367    }
368    
369    sub checkForMaxTime {
370            my $self=shift;
371            my $time=shift;
372            
373            $self->close(1,'maxTime') if(exists($self->{'connectionTimeLimit'}) && $self->{'connectionTimeLimit'}<$time);
374    }
375    
376    sub getConf {
377            my $self=shift;
378            my $key=shift;
379            
380            if(exists($self->{'mode'}) && $self->{'mode'} ne '')
381            {
382                    my $k=$key.$self->{'mode'};
383                    
384                    if(exists($::CONF{$k})) {
385                            return $::CONF{$k};
386                    }
387            }
388            
389            $::CONF{$key};
390    }
391    
392    1;
393  ############################################################################EOF  ############################################################################EOF

Legend:
Removed from v.9  
changed lines
  Added in v.62

  ViewVC Help
Powered by ViewVC 1.1.26