/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 9 by andrew.betts, Fri Dec 8 16:52:58 2006 UTC revision 37 by knops.gerd, Fri Feb 1 21:22:03 2008 UTC
# Line 1  Line 1 
1  #!/usr/bin/perl -w  #!/usr/bin/perl -w
2  ###############################################################################  ###############################################################################
3  #   Meteor  #   Meteor
4  #   An HTTP server for the 2.0 web  #   An HTTP server for the 2.0 web
5  #   Copyright (c) 2006 contributing authors  #   Copyright (c) 2006 contributing authors
6  #  #
7  #   Subscriber.pm  #   Subscriber.pm
8  #  #
9  #       Description:  #       Description:
10  #       A Meteor Subscriber  #       A Meteor Subscriber
11  #  #
12  ###############################################################################  ###############################################################################
13  #  #
14  #   This program is free software; you can redistribute it and/or modify it  #   This program is free software; you can redistribute it and/or modify it
15  #   under the terms of the GNU General Public License as published by the Free  #   under the terms of the GNU General Public License as published by the Free
16  #   Software Foundation; either version 2 of the License, or (at your option)  #   Software Foundation; either version 2 of the License, or (at your option)
17  #   any later version.  #   any later version.
18  #  #
19  #   This program is distributed in the hope that it will be useful, but WITHOUT  #   This program is distributed in the hope that it will be useful, but WITHOUT
20  #   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or  #   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  #   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for  #   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22  #   more details.  #   more details.
23  #  #
24  #   You should have received a copy of the GNU General Public License along  #   You should have received a copy of the GNU General Public License along
25  #   with this program; if not, write to the Free Software Foundation, Inc.,  #   with this program; if not, write to the Free Software Foundation, Inc.,
26  #   59 Temple Place, Suite 330, Boston, MA 02111-1307 USA  #   59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27  #  #
28  #   For more information visit www.meteorserver.org  #   For more information visit www.meteorserver.org
29  #  #
30  ###############################################################################  ###############################################################################
31    
32  package Meteor::Subscriber;  package Meteor::Subscriber;
33  ###############################################################################  ###############################################################################
34  # Configuration  # Configuration
35  ###############################################################################  ###############################################################################
36                    
37          use strict;          use strict;
38                    
39          use Meteor::Connection;          use Meteor::Connection;
40          use Meteor::Channel;          use Meteor::Channel;
41          use Meteor::Document;          use Meteor::Document;
42                    
43          @Meteor::Subscriber::ISA=qw(Meteor::Connection);          @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44                    
45          our %PersistentConnections=();          our %PersistentConnections=();
46            our $NumAcceptedConnections=0;
47  ###############################################################################      
48  # Factory methods  
49  ###############################################################################  ###############################################################################
50  sub newFromServer {  # Factory methods
51          my $class=shift;  ###############################################################################
52            sub newFromServer {
53          my $self=$class->SUPER::newFromServer(shift);          my $class=shift;
54                    
55          $self->{'headerBuffer'}='';          my $self=$class->SUPER::newFromServer(shift);
56          $self->{'MessageCount'}=0;          
57          $self->{'MaxMessageCount'}=0;          $self->{'headerBuffer'}='';
58                    $self->{'MessageCount'}=0;
59          $self->{'ConnectionStart'}=time;          $self->{'MaxMessageCount'}=0;
60          my $maxTime=$::CONF{'MaxTime'};          
61          if($maxTime>0)          $self->{'ConnectionStart'}=time;
62          {          my $maxTime=$::CONF{'MaxTime'};
63                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;          if($maxTime>0)
64          }          {
65                            $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
66          $self;          }
67  }          
68            $::Statistics->{'current_subscribers'}++;
69  ###############################################################################          $::Statistics->{'subscriber_connections_accepted'}++;
70  # Class methods          
71  ###############################################################################          $self;
72  sub deleteSubscriberWithID {  }
73          my $class=shift;  
74          my $id=shift;  ###############################################################################
75            # Class methods
76          if(exists($PersistentConnections{$id}))  ###############################################################################
77          {  sub deleteSubscriberWithID {
78                  $PersistentConnections{$id}->close(1);          my $class=shift;
79          }          my $id=shift;
80  }          
81            if(exists($PersistentConnections{$id}))
82  sub pingPersistentConnections {          {
83          my $class=shift;                  $PersistentConnections{$id}->close(1);
84                    }
85          my $msg=$::CONF{'PingMessage'};  }
86          my @cons=values %PersistentConnections;  
87            sub pingPersistentConnections {
88          map { $_->write($msg) } @cons;          my $class=shift;
89  }          
90            my $msg=$::CONF{'PingMessage'};
91  sub checkPersistentConnectionsForMaxTime {          my @cons=values %PersistentConnections;
92          my $class=shift;          
93                    map { $_->write($msg.chr(0)) } @cons;
94          my $time=time;  }
95          my @cons=values %PersistentConnections;  
96            sub checkPersistentConnectionsForMaxTime {
97          map { $_->checkForMaxTime($time) } @cons;          my $class=shift;
98  }          
99            my $time=time;
100  ###############################################################################          my @cons=values %PersistentConnections;
101  # Instance methods          
102  ###############################################################################          map { $_->checkForMaxTime($time) } @cons;
103  sub processLine {  }
104          my $self=shift;  
105          my $line=shift;  sub numSubscribers {
106                    
107          # Once the header was processed we ignore any input          return scalar(keys %PersistentConnections);
108          return unless(exists($self->{'headerBuffer'}));  }
109            
110          if($line ne '')  ###############################################################################
111          {  # Instance methods
112                  #  ###############################################################################
113                  # Accumulate header  sub processLine {
114                  #          my $self=shift;
115                  $self->{'headerBuffer'}.="$line\n";          my $line=shift;
116          }          
117          else          # Once the header was processed we ignore any input
118          {          return unless(exists($self->{'headerBuffer'}));
119                  #          
120                  # Empty line signals end of header.          if($line ne '')
121                  # Analyze header, register with appropiate channel          {
122                  # and send pending messages.                  #
123                  #                  # Accumulate header
124                  # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1                  #
125                  #                  $self->{'headerBuffer'}.="$line\n";
126                  # Find the 'GET' line          }
127                  #          else
128                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/)          {
129                  {                  #
130                          my @formData=split('&',$1);                  # Empty line signals end of header.
131                          my $channelName=undef;                  # Analyze header, register with appropiate channel
132                          my $startIndex=undef;                  # and send pending messages.
133                          my $backtrack=undef;                  #
134                          my $persist=1;                  # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
135                          my $subscriberID=undef;                  #
136                          foreach my $formElement (@formData)                  # Find the 'GET' line
137                          {                  #
138                                  if($formElement=~/^channel=(.+)$/)                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
139                                  {                  {
140                                          $channelName=$1;                          my $subscriberID=$1;
141                                  }                          my $persist=0;
142                                  elsif($formElement=~/^restartfrom=(\d*)$/)                          $self->{'mode'}=$2;
143                                  {                          if ($self->{'mode'} eq "xhrinteractive" || $self->{'mode'} eq "iframe" || $self->{'mode'} eq "serversent" || $self->{'mode'} eq "longpoll") {
144                                          $startIndex=$1;                                  $persist=1;
145                                          $startIndex='' unless(defined($startIndex));                                  $self->{'MaxMessageCount'}=1 unless(!($self->{'mode'} eq "longpoll"));
146                                  }                          }
147                                  elsif($formElement=~/^backtrack=(\d+)$/)                          if ($self->{'mode'} eq "iframe") {
148                                  {                                  $self->{'HeaderTemplateNumber'}=1;
149                                          $backtrack=$1;                          } else {
150                                          $backtrack=0 unless(defined($backtrack));                                  $self->{'HeaderTemplateNumber'}=2;
151                                  }                          }
152                                  elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/)                          my @channelData=split('/',$3);
153                                  {                          my $channels={};
154                                          $persist=0 if($1=~/(no|false|0)/i);                          my $channelName;
155                                  }                          my $offset;
156                                  elsif($formElement=~/^id=(.+)$/)                          foreach my $chandef (@channelData) {
157                                  {                                  if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {
158                                          $subscriberID=$1;                                          $channelName = $1;
159                                  }                                          $channels->{$channelName}->{'startIndex'} = undef;
160                                  elsif($formElement=~/^maxmessages=(\d+)$/i)                                          if ($3) {
161                                  {                                             $offset = $4;
162                                          $self->{'MaxMessageCount'}=$1;                                             if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
163                                  }                                             if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
164                                  elsif($formElement=~/^template=(\d+)$/i)                                             if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
165                                  {                                          }
166                                          $self->{'HeaderTemplateNumber'}=$1;                                  }
167                                  }                          }
168                                  elsif($formElement=~/^maxtime=(\d+)$/i)                          
169                                  {                          delete($self->{'headerBuffer'});
170                                          my $clientRequest=$1;                          
171                                          my $serverDefault=$::CONF{'MaxTime'};                          if($persist)
172                                                                    {
173                                          if($serverDefault==0 || $serverDefault>$clientRequest)                                  $self->{'subscriberID'}=$subscriberID;
174                                          {                                  $self->deleteSubscriberWithID($subscriberID);
175                                                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest;                                  $PersistentConnections{$subscriberID}=$self;
176                                          }                          }
177                                  }                          
178                          }                          if(scalar(keys %{$channels}))
179                                                                            {
180                          delete($self->{'headerBuffer'});                                  $self->emitOKHeader();
181                                                            $self->setChannels($channels,$persist);
182                          if(defined($startIndex) && defined($backtrack))                                  $self->close(1) unless($persist);
183                          {                                  return;
184                                  $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");                          }
185                                  $self->close();                  }
186                                                    elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
187                                  return;                  {
188                          }                          $self->deleteSubscriberWithID($1);
189                                                    $self->emitOKHeader();
190                          if(defined($subscriberID) && $persist)                          $self->close(1);
191                          {                          return;
192                                  $self->{'subscriberID'}=$subscriberID;                  }
193                                  $self->deleteSubscriberWithID($subscriberID);                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
194                                  $PersistentConnections{$subscriberID}=$self;                  {
195                          }                          Meteor::Document->serveFileToClient($1,$self);
196                                                    $self->close(1);
197                          if(defined($channelName))                          return;
198                          {                  }
199                                  $self->emitOKHeader();                  
200                                                    #
201                                  $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));                  # If we fall through we did not understand the request
202                                                    #
203                                  $self->setChannelName($channelName,$startIndex,$persist);                  $self->emitErrorHeader();
204                                            }
205                                  $self->close(1) unless($persist);  }
206                                    
207                                  return;  sub setChannels {
208                          }          my $self=shift;
209                  }          my $channels=shift;
210                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)          my $persist=shift;
211                  {          
212                          Meteor::Document->serveFileToClient($1,$self);          foreach my $channelName (keys %{$channels})
213                                    {
214                          $self->close(1);                  my $startIndex=$channels->{$channelName}->{'startIndex'};
215                                            
216                          return;                  my $channel=Meteor::Channel->channelWithName($channelName);
217                  }                  
218                                    $self->{'channels'}->{$channelName}=$channel if($persist);
219                  #                  
220                  # If we fall through we did not understand the request                  $channel->addSubscriber($self,$startIndex,$persist);
221                  #          }
222                  $self->emitErrorHeader();  }
223          }  
224  }  sub emitOKHeader {
225            my $self=shift;
226  sub setChannelName {          
227          my $self=shift;          $self->emitHeader('200 OK');
228          my $channelName=shift;  }
229          my $startIndex=shift;  
230          my $persist=shift;  sub emitErrorHeader {
231                    my $self=shift;
232          my $channel=Meteor::Channel->channelWithName($channelName);          
233          $self->{'channel'}=$channel if($persist);          $self->emitHeader('404 Not Found');
234                    $::Statistics->{'errors_served'}++;
235          $channel->addSubscriber($self,$startIndex,$persist);          
236  }          # close up shop here!
237            $self->close();
238  sub emitOKHeader {  }
239          my $self=shift;  
240            sub emitHeader {
241          $self->emitHeader('200 OK');          my $self=shift;
242  }          my $status=shift;
243            
244  sub emitErrorHeader {          my $header=undef;
245          my $self=shift;          if(exists($self->{'HeaderTemplateNumber'}))
246                    {
247          $self->emitHeader('404 Not Found');                  my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};
248                            
249          # close up shop here!                  $header=$::CONF{$hn};
250          $self->close();          }
251  }          $header=$::CONF{'HeaderTemplate'} unless(defined($header));
252            
253  sub emitHeader {          $header=~s/~([^~]+)~/
254          my $self=shift;                  if(!defined($1) || $1 eq '')
255          my $status=shift;                  {
256                                    '~';
257          my $header=undef;                  }
258          if(exists($self->{'HeaderTemplateNumber'}))                  elsif($1 eq 'server')
259          {                  {
260                  my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};                          $::PGM;
261                                    }
262                  $header=$::CONF{$hn};                  elsif($1 eq 'status')
263          }                  {
264          $header=$::CONF{'HeaderTemplate'} unless(defined($header));                          $status;
265                            }
266          $header=~s/~([^~]+)~/                  elsif($1 eq 'servertime')
267                  if(!defined($1) || $1 eq '')                  {
268                  {                          time;
269                          '~';                  }
270                  }                  else
271                  elsif($1 eq 'server')                  {
272                  {                          '';
273                          $::PGM;                  }
274                  }          /gex;
275                  elsif($1 eq 'status')          
276                  {          $self->write($header.chr(0));
277                          $status;  }
278                  }  
279                  elsif($1 eq 'servertime')  sub sendMessage {
280                  {          my $self=shift;
281                          time;          my $msg=shift;
282                  }          my $numMsgInThisBatch=shift;
283                  else          
284                  {          $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch));
285                          '';          
286                  }          $self->write($msg.chr(0));
287          /gex;          
288                    $::Statistics->{'messages_served'}+=$numMsgInThisBatch;
289          $self->write($header);          
290  }          my $msgCount=++$self->{'MessageCount'};
291            
292  sub sendMessage {          my $maxMsg=$::CONF{'MaxMessages'};
293          my $self=shift;          if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
294          my $msg=shift;          {
295                            $self->close(1);
296          $self->write($msg);          }
297                    
298          my $msgCount=++$self->{'MessageCount'};          if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
299                    {
300          my $maxMsg=$::CONF{'MaxMessages'};                  $self->close(1);
301          if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)          }
302          {  }
303                  $self->close(1);  
304          }  sub closeChannel {
305                    my $self=shift;
306          if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})          my $channelName=shift;
307          {          
308                  $self->close(1);          return unless(exists($self->{'channels'}->{$channelName}));
309          }          
310  }          my $channel=$self->{'channels'}->{$channelName};
311            $channel->removeSubscriber($self);
312  sub close {          
313          my $self=shift;          delete($self->{'channels'}->{$channelName});
314          my $noShutdownMsg=shift;          
315                    $self->close() if(scalar(keys %{$self->{'channels'}})==0);
316          $self->{'channel'}->removeSubscriber($self) if($self->{'channel'});  }
317          delete($self->{'channel'});  
318            sub close {
319          if(exists($self->{'subscriberID'}))          my $self=shift;
320          {          my $noShutdownMsg=shift;
321                  delete($PersistentConnections{$self->{'subscriberID'}});          
322          }          foreach my $channelName (keys %{$self->{'channels'}})
323                    {
324          #                  my $channel=$self->{'channels'}->{$channelName};
325          # Send shutdown message unless remote closed or                  $channel->removeSubscriber($self);
326          # connection not yet established          }
327          #          delete($self->{'channels'});
328          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))          
329          {          if(exists($self->{'subscriberID'}))
330                  my $msg=$::CONF{'SubscriberShutdownMsg'};          {
331                  if(defined($msg) && $msg ne '')                  delete($PersistentConnections{$self->{'subscriberID'}});
332                  {          }
333                          $self->write($msg);          
334                  }          #
335          }          # Send shutdown message unless remote closed or
336                    # connection not yet established
337          $self->SUPER::close();          #
338  }          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
339            {
340  sub checkForMaxTime {                  my $msg=$::CONF{'SubscriberShutdownMsg'};
341          my $self=shift;                  if(defined($msg) && $msg ne '')
342          my $time=shift;                  {
343                                    $self->write($msg);
344          $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);                  }
345  }          }
346            
347  1;          $self->SUPER::close();
348    }
349    
350    sub didClose {
351            
352            $::Statistics->{'current_subscribers'}--;
353    }
354    
355    sub checkForMaxTime {
356            my $self=shift;
357            my $time=shift;
358            
359            $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
360    }
361    
362    1;
363  ############################################################################EOF  ############################################################################EOF

Legend:
Removed from v.9  
changed lines
  Added in v.37

  ViewVC Help
Powered by ViewVC 1.1.26