/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 9 by andrew.betts, Fri Dec 8 16:52:58 2006 UTC revision 25 by knops.gerd, Sun May 20 19:40:53 2007 UTC
# Line 1  Line 1 
1  #!/usr/bin/perl -w  #!/usr/bin/perl -w
2  ###############################################################################  ###############################################################################
3  #   Meteor  #   Meteor
4  #   An HTTP server for the 2.0 web  #   An HTTP server for the 2.0 web
5  #   Copyright (c) 2006 contributing authors  #   Copyright (c) 2006 contributing authors
6  #  #
7  #   Subscriber.pm  #   Subscriber.pm
8  #  #
9  #       Description:  #       Description:
10  #       A Meteor Subscriber  #       A Meteor Subscriber
11  #  #
12  ###############################################################################  ###############################################################################
13  #  #
14  #   This program is free software; you can redistribute it and/or modify it  #   This program is free software; you can redistribute it and/or modify it
15  #   under the terms of the GNU General Public License as published by the Free  #   under the terms of the GNU General Public License as published by the Free
16  #   Software Foundation; either version 2 of the License, or (at your option)  #   Software Foundation; either version 2 of the License, or (at your option)
17  #   any later version.  #   any later version.
18  #  #
19  #   This program is distributed in the hope that it will be useful, but WITHOUT  #   This program is distributed in the hope that it will be useful, but WITHOUT
20  #   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or  #   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  #   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for  #   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22  #   more details.  #   more details.
23  #  #
24  #   You should have received a copy of the GNU General Public License along  #   You should have received a copy of the GNU General Public License along
25  #   with this program; if not, write to the Free Software Foundation, Inc.,  #   with this program; if not, write to the Free Software Foundation, Inc.,
26  #   59 Temple Place, Suite 330, Boston, MA 02111-1307 USA  #   59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27  #  #
28  #   For more information visit www.meteorserver.org  #   For more information visit www.meteorserver.org
29  #  #
30  ###############################################################################  ###############################################################################
31    
32  package Meteor::Subscriber;  package Meteor::Subscriber;
33  ###############################################################################  ###############################################################################
34  # Configuration  # Configuration
35  ###############################################################################  ###############################################################################
36                    
37          use strict;          use strict;
38                    
39          use Meteor::Connection;          use Meteor::Connection;
40          use Meteor::Channel;          use Meteor::Channel;
41          use Meteor::Document;          use Meteor::Document;
42                    
43          @Meteor::Subscriber::ISA=qw(Meteor::Connection);          @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44                    
45          our %PersistentConnections=();          our %PersistentConnections=();
46            our $NumAcceptedConnections=0;
47  ###############################################################################  
48  # Factory methods  ###############################################################################
49  ###############################################################################  # Factory methods
50  sub newFromServer {  ###############################################################################
51          my $class=shift;  sub newFromServer {
52                    my $class=shift;
53          my $self=$class->SUPER::newFromServer(shift);          
54                    my $self=$class->SUPER::newFromServer(shift);
55          $self->{'headerBuffer'}='';          
56          $self->{'MessageCount'}=0;          $self->{'headerBuffer'}='';
57          $self->{'MaxMessageCount'}=0;          $self->{'MessageCount'}=0;
58                    $self->{'MaxMessageCount'}=0;
59          $self->{'ConnectionStart'}=time;          
60          my $maxTime=$::CONF{'MaxTime'};          $self->{'ConnectionStart'}=time;
61          if($maxTime>0)          my $maxTime=$::CONF{'MaxTime'};
62          {          if($maxTime>0)
63                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;          {
64          }                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
65                    }
66          $self;          
67  }          $::Statistics->{'current_subscribers'}++;
68            $::Statistics->{'subscriber_connections_accepted'}++;
69  ###############################################################################          
70  # Class methods          $self;
71  ###############################################################################  }
72  sub deleteSubscriberWithID {  
73          my $class=shift;  ###############################################################################
74          my $id=shift;  # Class methods
75            ###############################################################################
76          if(exists($PersistentConnections{$id}))  sub deleteSubscriberWithID {
77          {          my $class=shift;
78                  $PersistentConnections{$id}->close(1);          my $id=shift;
79          }          
80  }          if(exists($PersistentConnections{$id}))
81            {
82  sub pingPersistentConnections {                  $PersistentConnections{$id}->close(1);
83          my $class=shift;          }
84            }
85          my $msg=$::CONF{'PingMessage'};  
86          my @cons=values %PersistentConnections;  sub pingPersistentConnections {
87                    my $class=shift;
88          map { $_->write($msg) } @cons;          
89  }          my $msg=$::CONF{'PingMessage'};
90            my @cons=values %PersistentConnections;
91  sub checkPersistentConnectionsForMaxTime {          
92          my $class=shift;          map { $_->write($msg) } @cons;
93            }
94          my $time=time;  
95          my @cons=values %PersistentConnections;  sub checkPersistentConnectionsForMaxTime {
96                    my $class=shift;
97          map { $_->checkForMaxTime($time) } @cons;          
98  }          my $time=time;
99            my @cons=values %PersistentConnections;
100  ###############################################################################          
101  # Instance methods          map { $_->checkForMaxTime($time) } @cons;
102  ###############################################################################  }
103  sub processLine {  
104          my $self=shift;  sub numSubscribers {
105          my $line=shift;          
106                    return scalar(keys %PersistentConnections);
107          # Once the header was processed we ignore any input  }
108          return unless(exists($self->{'headerBuffer'}));  
109            ###############################################################################
110          if($line ne '')  # Instance methods
111          {  ###############################################################################
112                  #  sub processLine {
113                  # Accumulate header          my $self=shift;
114                  #          my $line=shift;
115                  $self->{'headerBuffer'}.="$line\n";          
116          }          # Once the header was processed we ignore any input
117          else          return unless(exists($self->{'headerBuffer'}));
118          {          
119                  #          if($line ne '')
120                  # Empty line signals end of header.          {
121                  # Analyze header, register with appropiate channel                  #
122                  # and send pending messages.                  # Accumulate header
123                  #                  #
124                  # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1                  $self->{'headerBuffer'}.="$line\n";
125                  #          }
126                  # Find the 'GET' line          else
127                  #          {
128                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/)                  #
129                  {                  # Empty line signals end of header.
130                          my @formData=split('&',$1);                  # Analyze header, register with appropiate channel
131                          my $channelName=undef;                  # and send pending messages.
132                          my $startIndex=undef;                  #
133                          my $backtrack=undef;                  # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1
134                          my $persist=1;                  #
135                          my $subscriberID=undef;                  # Find the 'GET' line
136                          foreach my $formElement (@formData)                  #
137                          {                  if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/)
138                                  if($formElement=~/^channel=(.+)$/)                  {
139                                  {                          my @formData=split('&',$1);
140                                          $channelName=$1;                          my $channelName=undef;
141                                  }                          my $startIndex=undef;
142                                  elsif($formElement=~/^restartfrom=(\d*)$/)                          my $backtrack=undef;
143                                  {                          my $persist=1;
144                                          $startIndex=$1;                          my $subscriberID=undef;
145                                          $startIndex='' unless(defined($startIndex));                          my $channels={};
146                                  }                          foreach my $formElement (@formData)
147                                  elsif($formElement=~/^backtrack=(\d+)$/)                          {
148                                  {                                  if($formElement=~/^channel=(.+)$/)
149                                          $backtrack=$1;                                  {
150                                          $backtrack=0 unless(defined($backtrack));                                          if(defined($channelName))
151                                  }                                          {
152                                  elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/)                                                  if(defined($startIndex) && defined($backtrack))
153                                  {                                                  {
154                                          $persist=0 if($1=~/(no|false|0)/i);                                                          $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");
155                                  }                                                          $self->close();
156                                  elsif($formElement=~/^id=(.+)$/)                                                          
157                                  {                                                          return;
158                                          $subscriberID=$1;                                                  }
159                                  }                                                  
160                                  elsif($formElement=~/^maxmessages=(\d+)$/i)                                                  $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));
161                                  {                                                  $channels->{$channelName}->{'startIndex'}=$startIndex;
162                                          $self->{'MaxMessageCount'}=$1;                                                  
163                                  }                                                  $startIndex=undef;
164                                  elsif($formElement=~/^template=(\d+)$/i)                                                  $backtrack=undef;
165                                  {                                          }
166                                          $self->{'HeaderTemplateNumber'}=$1;                                          $channelName=$1;
167                                  }                                  }
168                                  elsif($formElement=~/^maxtime=(\d+)$/i)                                  elsif($formElement=~/^restartfrom=(\d*)$/)
169                                  {                                  {
170                                          my $clientRequest=$1;                                          $startIndex=$1;
171                                          my $serverDefault=$::CONF{'MaxTime'};                                          $startIndex='' unless(defined($startIndex));
172                                                                            }
173                                          if($serverDefault==0 || $serverDefault>$clientRequest)                                  elsif($formElement=~/^backtrack=(\d+)$/)
174                                          {                                  {
175                                                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest;                                          $backtrack=$1;
176                                          }                                          $backtrack=0 unless(defined($backtrack));
177                                  }                                  }
178                          }                                  elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/)
179                                                                                    {
180                          delete($self->{'headerBuffer'});                                          $persist=0 if($1=~/(no|false|0)/i);
181                                                            }
182                          if(defined($startIndex) && defined($backtrack))                                  elsif($formElement=~/^id=(.+)$/)
183                          {                                  {
184                                  $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");                                          $subscriberID=$1;
185                                  $self->close();                                  }
186                                                                    elsif($formElement=~/^maxmessages=(\d+)$/i)
187                                  return;                                  {
188                          }                                          $self->{'MaxMessageCount'}=$1;
189                                                            }
190                          if(defined($subscriberID) && $persist)                                  elsif($formElement=~/^template=(\d+)$/i)
191                          {                                  {
192                                  $self->{'subscriberID'}=$subscriberID;                                          $self->{'HeaderTemplateNumber'}=$1;
193                                  $self->deleteSubscriberWithID($subscriberID);                                  }
194                                  $PersistentConnections{$subscriberID}=$self;                                  elsif($formElement=~/^maxtime=(\d+)$/i)
195                          }                                  {
196                                                                    my $clientRequest=$1;
197                          if(defined($channelName))                                          my $serverDefault=$::CONF{'MaxTime'};
198                          {                                          
199                                  $self->emitOKHeader();                                          if($serverDefault==0 || $serverDefault>$clientRequest)
200                                                                            {
201                                  $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));                                                  $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest;
202                                                                            }
203                                  $self->setChannelName($channelName,$startIndex,$persist);                                  }
204                                                            }
205                                  $self->close(1) unless($persist);                          
206                                                            if(defined($channelName))
207                                  return;                          {
208                          }                                  if(defined($startIndex) && defined($backtrack))
209                  }                                  {
210                  elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)                                          $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");
211                  {                                          $self->close();
212                          Meteor::Document->serveFileToClient($1,$self);                                          
213                                                                    return;
214                          $self->close(1);                                  }
215                                                            
216                          return;                                  $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));
217                  }                                  $channels->{$channelName}->{'startIndex'}=$startIndex;
218                                            }
219                  #                          
220                  # If we fall through we did not understand the request                          delete($self->{'headerBuffer'});
221                  #                          
222                  $self->emitErrorHeader();                          if(defined($subscriberID) && $persist)
223          }                          {
224  }                                  $self->{'subscriberID'}=$subscriberID;
225                                    $self->deleteSubscriberWithID($subscriberID);
226  sub setChannelName {                                  $PersistentConnections{$subscriberID}=$self;
227          my $self=shift;                          }
228          my $channelName=shift;                          
229          my $startIndex=shift;                          if(scalar(keys %{$channels}))
230          my $persist=shift;                          {
231                                            $self->emitOKHeader();
232          my $channel=Meteor::Channel->channelWithName($channelName);                                  
233          $self->{'channel'}=$channel if($persist);                                  $self->setChannels($channels,$persist);
234                                            
235          $channel->addSubscriber($self,$startIndex,$persist);                                  $self->close(1) unless($persist);
236  }                                  
237                                    return;
238  sub emitOKHeader {                          }
239          my $self=shift;                  }
240                            elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
241          $self->emitHeader('200 OK');                  {
242  }                          Meteor::Document->serveFileToClient($1,$self);
243                            
244  sub emitErrorHeader {                          $self->close(1);
245          my $self=shift;                          
246                                    return;
247          $self->emitHeader('404 Not Found');                  }
248                            
249          # close up shop here!                  #
250          $self->close();                  # If we fall through we did not understand the request
251  }                  #
252                    $self->emitErrorHeader();
253  sub emitHeader {          }
254          my $self=shift;  }
255          my $status=shift;  
256            sub setChannels {
257          my $header=undef;          my $self=shift;
258          if(exists($self->{'HeaderTemplateNumber'}))          my $channels=shift;
259          {          my $persist=shift;
260                  my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};          
261                            foreach my $channelName (keys %{$channels})
262                  $header=$::CONF{$hn};          {
263          }                  my $startIndex=$channels->{$channelName}->{'startIndex'};
264          $header=$::CONF{'HeaderTemplate'} unless(defined($header));                  
265                            my $channel=Meteor::Channel->channelWithName($channelName);
266          $header=~s/~([^~]+)~/                  
267                  if(!defined($1) || $1 eq '')                  $self->{'channels'}->{$channelName}=$channel if($persist);
268                  {                  
269                          '~';                  $channel->addSubscriber($self,$startIndex,$persist);
270                  }          }
271                  elsif($1 eq 'server')  }
272                  {  
273                          $::PGM;  sub emitOKHeader {
274                  }          my $self=shift;
275                  elsif($1 eq 'status')          
276                  {          $self->emitHeader('200 OK');
277                          $status;  }
278                  }  
279                  elsif($1 eq 'servertime')  sub emitErrorHeader {
280                  {          my $self=shift;
281                          time;          
282                  }          $self->emitHeader('404 Not Found');
283                  else          $::Statistics->{'errors_served'}++;
284                  {          
285                          '';          # close up shop here!
286                  }          $self->close();
287          /gex;  }
288            
289          $self->write($header);  sub emitHeader {
290  }          my $self=shift;
291            my $status=shift;
292  sub sendMessage {          
293          my $self=shift;          my $header=undef;
294          my $msg=shift;          if(exists($self->{'HeaderTemplateNumber'}))
295                    {
296          $self->write($msg);                  my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};
297                            
298          my $msgCount=++$self->{'MessageCount'};                  $header=$::CONF{$hn};
299                    }
300          my $maxMsg=$::CONF{'MaxMessages'};          $header=$::CONF{'HeaderTemplate'} unless(defined($header));
301          if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)          
302          {          $header=~s/~([^~]+)~/
303                  $self->close(1);                  if(!defined($1) || $1 eq '')
304          }                  {
305                                    '~';
306          if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})                  }
307          {                  elsif($1 eq 'server')
308                  $self->close(1);                  {
309          }                          $::PGM;
310  }                  }
311                    elsif($1 eq 'status')
312  sub close {                  {
313          my $self=shift;                          $status;
314          my $noShutdownMsg=shift;                  }
315                            elsif($1 eq 'servertime')
316          $self->{'channel'}->removeSubscriber($self) if($self->{'channel'});                  {
317          delete($self->{'channel'});                          time;
318                            }
319          if(exists($self->{'subscriberID'}))                  else
320          {                  {
321                  delete($PersistentConnections{$self->{'subscriberID'}});                          '';
322          }                  }
323                    /gex;
324          #          
325          # Send shutdown message unless remote closed or          $self->write($header);
326          # connection not yet established  }
327          #  
328          unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))  sub sendMessage {
329          {          my $self=shift;
330                  my $msg=$::CONF{'SubscriberShutdownMsg'};          my $msg=shift;
331                  if(defined($msg) && $msg ne '')          my $numMsgInThisBatch=shift;
332                  {          
333                          $self->write($msg);          $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch));
334                  }          
335          }          $self->write($msg);
336                    
337          $self->SUPER::close();          $::Statistics->{'messages_served'}+=$numMsgInThisBatch;
338  }          
339            my $msgCount=++$self->{'MessageCount'};
340  sub checkForMaxTime {          
341          my $self=shift;          my $maxMsg=$::CONF{'MaxMessages'};
342          my $time=shift;          if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
343                    {
344          $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);                  $self->close(1);
345  }          }
346            
347  1;          if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
348            {
349                    $self->close(1);
350            }
351    }
352    
353    sub closeChannel {
354            my $self=shift;
355            my $channelName=shift;
356            
357            return unless(exists($self->{'channels'}->{$channelName}));
358            
359            my $channel=$self->{'channels'}->{$channelName};
360            $channel->removeSubscriber($self);
361            
362            delete($self->{'channels'}->{$channelName});
363            
364            $self->close() if(scalar(keys %{$self->{'channels'}})==0);
365    }
366    
367    sub close {
368            my $self=shift;
369            my $noShutdownMsg=shift;
370            
371            foreach my $channelName (keys %{$self->{'channels'}})
372            {
373                    my $channel=$self->{'channels'}->{$channelName};
374                    $channel->removeSubscriber($self);
375            }
376            delete($self->{'channels'});
377            
378            if(exists($self->{'subscriberID'}))
379            {
380                    delete($PersistentConnections{$self->{'subscriberID'}});
381            }
382            
383            #
384            # Send shutdown message unless remote closed or
385            # connection not yet established
386            #
387            unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
388            {
389                    my $msg=$::CONF{'SubscriberShutdownMsg'};
390                    if(defined($msg) && $msg ne '')
391                    {
392                            $self->write($msg);
393                    }
394            }
395            
396            $::Statistics->{'current_subscribers'}--;
397            
398            $self->SUPER::close();
399    }
400    
401    sub checkForMaxTime {
402            my $self=shift;
403            my $time=shift;
404            
405            $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
406    }
407    
408    1;
409  ############################################################################EOF  ############################################################################EOF

Legend:
Removed from v.9  
changed lines
  Added in v.25

  ViewVC Help
Powered by ViewVC 1.1.26