/[meteor]/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

googlecode.com/svn/trunk/Meteor/Subscriber.pm revision 9 by andrew.betts, Fri Dec 8 16:52:58 2006 UTC trunk/Meteor/Subscriber.pm revision 70 by dpavlin, Sat Mar 28 03:45:31 2009 UTC
# Line 1  Line 1 
 #!/usr/bin/perl -w  
 ###############################################################################  
 #   Meteor  
 #   An HTTP server for the 2.0 web  
 #   Copyright (c) 2006 contributing authors  
 #  
 #   Subscriber.pm  
 #  
 #       Description:  
 #       A Meteor Subscriber  
 #  
 ###############################################################################  
 #  
 #   This program is free software; you can redistribute it and/or modify it  
 #   under the terms of the GNU General Public License as published by the Free  
 #   Software Foundation; either version 2 of the License, or (at your option)  
 #   any later version.  
 #  
 #   This program is distributed in the hope that it will be useful, but WITHOUT  
 #   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or  
 #   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for  
 #   more details.  
 #  
 #   You should have received a copy of the GNU General Public License along  
 #   with this program; if not, write to the Free Software Foundation, Inc.,  
 #   59 Temple Place, Suite 330, Boston, MA 02111-1307 USA  
 #  
 #   For more information visit www.meteorserver.org  
 #  
 ###############################################################################  
   
 package Meteor::Subscriber;  
 ###############################################################################  
 # Configuration  
 ###############################################################################  
           
         use strict;  
           
         use Meteor::Connection;  
         use Meteor::Channel;  
         use Meteor::Document;  
           
         @Meteor::Subscriber::ISA=qw(Meteor::Connection);  
           
         our %PersistentConnections=();  
   
 ###############################################################################  
 # Factory methods  
 ###############################################################################  
 sub newFromServer {  
         my $class=shift;  
           
         my $self=$class->SUPER::newFromServer(shift);  
           
         $self->{'headerBuffer'}='';  
         $self->{'MessageCount'}=0;  
         $self->{'MaxMessageCount'}=0;  
           
         $self->{'ConnectionStart'}=time;  
         my $maxTime=$::CONF{'MaxTime'};  
         if($maxTime>0)  
         {  
                 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;  
         }  
           
         $self;  
 }  
   
 ###############################################################################  
 # Class methods  
 ###############################################################################  
 sub deleteSubscriberWithID {  
         my $class=shift;  
         my $id=shift;  
           
         if(exists($PersistentConnections{$id}))  
         {  
                 $PersistentConnections{$id}->close(1);  
         }  
 }  
   
 sub pingPersistentConnections {  
         my $class=shift;  
           
         my $msg=$::CONF{'PingMessage'};  
         my @cons=values %PersistentConnections;  
           
         map { $_->write($msg) } @cons;  
 }  
   
 sub checkPersistentConnectionsForMaxTime {  
         my $class=shift;  
           
         my $time=time;  
         my @cons=values %PersistentConnections;  
           
         map { $_->checkForMaxTime($time) } @cons;  
 }  
   
 ###############################################################################  
 # Instance methods  
 ###############################################################################  
 sub processLine {  
         my $self=shift;  
         my $line=shift;  
           
         # Once the header was processed we ignore any input  
         return unless(exists($self->{'headerBuffer'}));  
           
         if($line ne '')  
         {  
                 #  
                 # Accumulate header  
                 #  
                 $self->{'headerBuffer'}.="$line\n";  
         }  
         else  
         {  
                 #  
                 # Empty line signals end of header.  
                 # Analyze header, register with appropiate channel  
                 # and send pending messages.  
                 #  
                 # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1  
                 #  
                 # Find the 'GET' line  
                 #  
                 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/)  
                 {  
                         my @formData=split('&',$1);  
                         my $channelName=undef;  
                         my $startIndex=undef;  
                         my $backtrack=undef;  
                         my $persist=1;  
                         my $subscriberID=undef;  
                         foreach my $formElement (@formData)  
                         {  
                                 if($formElement=~/^channel=(.+)$/)  
                                 {  
                                         $channelName=$1;  
                                 }  
                                 elsif($formElement=~/^restartfrom=(\d*)$/)  
                                 {  
                                         $startIndex=$1;  
                                         $startIndex='' unless(defined($startIndex));  
                                 }  
                                 elsif($formElement=~/^backtrack=(\d+)$/)  
                                 {  
                                         $backtrack=$1;  
                                         $backtrack=0 unless(defined($backtrack));  
                                 }  
                                 elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/)  
                                 {  
                                         $persist=0 if($1=~/(no|false|0)/i);  
                                 }  
                                 elsif($formElement=~/^id=(.+)$/)  
                                 {  
                                         $subscriberID=$1;  
                                 }  
                                 elsif($formElement=~/^maxmessages=(\d+)$/i)  
                                 {  
                                         $self->{'MaxMessageCount'}=$1;  
                                 }  
                                 elsif($formElement=~/^template=(\d+)$/i)  
                                 {  
                                         $self->{'HeaderTemplateNumber'}=$1;  
                                 }  
                                 elsif($formElement=~/^maxtime=(\d+)$/i)  
                                 {  
                                         my $clientRequest=$1;  
                                         my $serverDefault=$::CONF{'MaxTime'};  
                                           
                                         if($serverDefault==0 || $serverDefault>$clientRequest)  
                                         {  
                                                 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest;  
                                         }  
                                 }  
                         }  
                                                   
                         delete($self->{'headerBuffer'});  
                           
                         if(defined($startIndex) && defined($backtrack))  
                         {  
                                 $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");  
                                 $self->close();  
                                   
                                 return;  
                         }  
                           
                         if(defined($subscriberID) && $persist)  
                         {  
                                 $self->{'subscriberID'}=$subscriberID;  
                                 $self->deleteSubscriberWithID($subscriberID);  
                                 $PersistentConnections{$subscriberID}=$self;  
                         }  
                           
                         if(defined($channelName))  
                         {  
                                 $self->emitOKHeader();  
                                   
                                 $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));  
                                   
                                 $self->setChannelName($channelName,$startIndex,$persist);  
                                   
                                 $self->close(1) unless($persist);  
                                   
                                 return;  
                         }  
                 }  
                 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)  
                 {  
                         Meteor::Document->serveFileToClient($1,$self);  
                           
                         $self->close(1);  
                           
                         return;  
                 }  
                   
                 #  
                 # If we fall through we did not understand the request  
                 #  
                 $self->emitErrorHeader();  
         }  
 }  
   
 sub setChannelName {  
         my $self=shift;  
         my $channelName=shift;  
         my $startIndex=shift;  
         my $persist=shift;  
           
         my $channel=Meteor::Channel->channelWithName($channelName);  
         $self->{'channel'}=$channel if($persist);  
           
         $channel->addSubscriber($self,$startIndex,$persist);  
 }  
   
 sub emitOKHeader {  
         my $self=shift;  
           
         $self->emitHeader('200 OK');  
 }  
   
 sub emitErrorHeader {  
         my $self=shift;  
           
         $self->emitHeader('404 Not Found');  
           
         # close up shop here!  
         $self->close();  
 }  
   
 sub emitHeader {  
         my $self=shift;  
         my $status=shift;  
           
         my $header=undef;  
         if(exists($self->{'HeaderTemplateNumber'}))  
         {  
                 my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};  
                   
                 $header=$::CONF{$hn};  
         }  
         $header=$::CONF{'HeaderTemplate'} unless(defined($header));  
           
         $header=~s/~([^~]+)~/  
                 if(!defined($1) || $1 eq '')  
                 {  
                         '~';  
                 }  
                 elsif($1 eq 'server')  
                 {  
                         $::PGM;  
                 }  
                 elsif($1 eq 'status')  
                 {  
                         $status;  
                 }  
                 elsif($1 eq 'servertime')  
                 {  
                         time;  
                 }  
                 else  
                 {  
                         '';  
                 }  
         /gex;  
           
         $self->write($header);  
 }  
   
 sub sendMessage {  
         my $self=shift;  
         my $msg=shift;  
           
         $self->write($msg);  
           
         my $msgCount=++$self->{'MessageCount'};  
           
         my $maxMsg=$::CONF{'MaxMessages'};  
         if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)  
         {  
                 $self->close(1);  
         }  
           
         if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})  
         {  
                 $self->close(1);  
         }  
 }  
   
 sub close {  
         my $self=shift;  
         my $noShutdownMsg=shift;  
           
         $self->{'channel'}->removeSubscriber($self) if($self->{'channel'});  
         delete($self->{'channel'});  
           
         if(exists($self->{'subscriberID'}))  
         {  
                 delete($PersistentConnections{$self->{'subscriberID'}});  
         }  
           
         #  
         # Send shutdown message unless remote closed or  
         # connection not yet established  
         #  
         unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))  
         {  
                 my $msg=$::CONF{'SubscriberShutdownMsg'};  
                 if(defined($msg) && $msg ne '')  
                 {  
                         $self->write($msg);  
                 }  
         }  
           
         $self->SUPER::close();  
 }  
   
 sub checkForMaxTime {  
         my $self=shift;  
         my $time=shift;  
           
         $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);  
 }  
   
 1;  
 ############################################################################EOF  
1    #!/usr/bin/perl -w
2    ###############################################################################
3    #   Meteor
4    #   An HTTP server for the 2.0 web
5    #   Copyright (c) 2006 contributing authors
6    #
7    #   Subscriber.pm
8    #
9    #       Description:
10    #       A Meteor Subscriber
11    #
12    ###############################################################################
13    #
14    #   This program is free software; you can redistribute it and/or modify it
15    #   under the terms of the GNU General Public License as published by the Free
16    #   Software Foundation; either version 2 of the License, or (at your option)
17    #   any later version.
18    #
19    #   This program is distributed in the hope that it will be useful, but WITHOUT
20    #   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21    #   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22    #   more details.
23    #
24    #   You should have received a copy of the GNU General Public License along
25    #   with this program; if not, write to the Free Software Foundation, Inc.,
26    #   59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27    #
28    #   For more information visit www.meteorserver.org
29    #
30    ###############################################################################
31    
32    package Meteor::Subscriber;
33    ###############################################################################
34    # Configuration
35    ###############################################################################
36            
37            use strict;
38            
39            use Meteor::Connection;
40            use Meteor::Channel;
41            use Meteor::Document;
42            use Meteor::Koha;
43            
44            @Meteor::Subscriber::ISA=qw(Meteor::Connection);
45            
46            our %PersistentConnections=();
47            our $NumAcceptedConnections=0;
48        
49    
50    ###############################################################################
51    # Factory methods
52    ###############################################################################
53    sub newFromServer {
54            my $class=shift;
55            
56            my $self=$class->SUPER::newFromServer(shift);
57            
58            $self->{'headerBuffer'}='';
59            $self->{'messageCount'}=0;
60            
61            $::Statistics->{'current_subscribers'}++;
62            $::Statistics->{'subscriber_connections_accepted'}++;
63            
64            $self;
65    }
66    
67    ###############################################################################
68    # Class methods
69    ###############################################################################
70    sub deleteSubscriberWithID {
71            my $class=shift;
72            my $id=shift;
73            
74            if(exists($PersistentConnections{$id}))
75            {
76                    $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
77            }
78    }
79    
80    sub subscriberExists {
81            my $class=shift;
82            my $id=shift;
83    
84            return exists($PersistentConnections{$id});
85    }
86    
87    sub pingPersistentConnections {
88            my $class=shift;
89            
90            my @cons=values %PersistentConnections;
91            
92            map { $_->ping() } @cons;
93    }
94    
95    sub checkPersistentConnectionsForMaxTime {
96            my $class=shift;
97            
98            my $time=time;
99            my @cons=values %PersistentConnections;
100            
101            map { $_->checkForMaxTime($time) } @cons;
102    }
103    
104    sub numSubscribers {
105            
106            return scalar(keys %PersistentConnections);
107    }
108    
109    sub listSubscribers {
110            my $class=shift;
111            my $list='';
112            foreach my $subscriber (keys %PersistentConnections)
113            {
114                    my $sub = $PersistentConnections{$subscriber};
115                    $list .= $subscriber.' '.$sub->{'ip'}.' '.$sub->{'connectionStart'}.' '.$sub->{'connectionTimeLimit'}.' '.($sub->{'connectionTimeLimit'}-time).' '.$sub->{'messageCount'}.' "'.$sub->{'useragent'}."\"$::CRLF";
116            }
117            $list;
118    }
119    
120    ###############################################################################
121    # Instance methods
122    ###############################################################################
123    sub processLine {
124            my $self=shift;
125            my $line=shift;
126            
127            # Once the header was processed we ignore any input - Meteor does not accept or process request bodies
128            return unless(exists($self->{'headerBuffer'}));
129            
130            if($line ne '')
131            {
132                    #
133                    # Accumulate header
134                    #
135                    $self->{'headerBuffer'}.="$line\n";
136            }
137            else
138            {
139                    #
140                    # Empty line signals end of header.
141                    # Analyze header, register with appropiate channel
142                    # and send pending messages.
143                    #
144                    # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
145                    #
146                    # Find the 'GET' line
147                    #
148                    if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i)
149                    {
150                            $self->{'subscriberID'}=$1;
151                            $self->{'mode'}=$2;
152                            my $persist=$self->getConf('Persist');
153                            my $maxTime=$self->getConf('MaxTime');
154                            $self->{'connectionTimeLimit'} = ($self->{'connectionStart'}+$maxTime) if ($maxTime>0);
155                            
156                            my @channelData=split('/',$3);
157                            my $channels={};
158                            my $channelName;
159                            my $offset;
160                            foreach my $chandef (@channelData) {
161                                    if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
162                                            $channelName = $1;
163                                            $channels->{$channelName}->{'startIndex'} = undef;
164                                            if ($3) {
165                                               $offset = $4;
166                                               if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
167                                               if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
168                                               if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
169                                            }
170                                    }
171                            }
172                            $self->{'useragent'} = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
173                            
174                            if ($persist) {
175                                    # New persistent connection: kill any existing connection with same ID
176                                    $self->deleteSubscriberWithID($self->{'subscriberID'});
177                                    # Add new persistent connection to collection
178                                    $PersistentConnections{$self->{'subscriberID'}}=$self;
179                            } else {
180                                    $::Pollers->{$self->{'subscriberID'}} = time;
181                            }
182                            
183                            if(scalar(keys %{$channels})) {
184    
185                                    $self->{'channelinfo'} = '';
186                                    my $citemplate = $self->getConf('ChannelInfoTemplate');
187                                    foreach $channelName (keys %{$channels}) {
188                                            my $channel=Meteor::Channel->channelWithName($channelName);
189                                            $self->{'channels'}->{$channelName}=$channel;
190                                            if (defined($self->{'channels'}->{$channelName}->{'startIndex'}) && $self->{'channels'}->{$channelName}->{'startIndex'} > 0) {
191                                                    $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
192                                            }
193                                    }
194                                    $self->emitOKHeader();
195                                    foreach $channelName (keys %{$channels}) {
196                                            my $startIndex=$channels->{$channelName}->{'startIndex'};
197                                            $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$self->{'useragent'});
198                                    }
199                                    if (!$persist) {
200                                            delete ($self->{'channels'});
201                                            $self->close(1, 'responseComplete');
202                                    }
203                                    delete($self->{'headerBuffer'});
204    
205                                    # If long polling, close connection immediately if any messages have been sent
206                                    if ($self->{'messageCount'} > 0 && $self->{'mode'} eq 'longpoll') {
207                                            $self->close(1, 'closedOnEvent');
208                                    }
209                                    return;
210                            }
211                    }
212                    elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
213                    {
214                            $self->deleteSubscriberWithID($1);
215                            $self->emitOKHeader();
216                            $self->close(1, 'disconnectRequested');
217                            return;
218                    }
219                    elsif($self->{'headerBuffer'}=~m{GET\s+/koha/(\d+)})
220                    {
221                            Meteor::Koha->item($self,$1);
222                            $self->SUPER::close();
223                            return;
224                    }
225                    elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
226                    {
227                            Meteor::Document->serveFileToClient($1,$self);
228                            $self->SUPER::close();
229                            return;
230                    }
231                    
232                    #
233                    # If we fall through we did not understand the request
234                    #
235                    $self->emitErrorHeader();
236            }
237    }
238    
239    sub emitOKHeader {
240            my $self=shift;
241            
242            $self->emitHeader('200 OK');
243    }
244    
245    sub emitErrorHeader {
246            my $self=shift;
247            
248            $self->emitHeader('404 Not Found');
249            $::Statistics->{'errors_served'}++;
250            
251            # close up shop here!
252            $self->close(0, 'error');
253    }
254    
255    sub emitHeader {
256            my $self=shift;
257            my $status=shift;
258            
259            my $header=$self->getConf('HeaderTemplate');
260            
261            $header=~s/~([^~]*)~/
262                    if(!defined($1) || $1 eq '') {
263                            '~';
264                    } elsif($1 eq 'server') {
265                            $::PGM;
266                    } elsif($1 eq 'status') {
267                            $status;
268                    } elsif($1 eq 'servertime') {
269                            time;
270                    } elsif($1 eq 'channelinfo') {
271                            $self->{'channelinfo'};
272                    } else {
273                            '';
274                    }
275            /gex;
276            
277            $self->write($header);
278    }
279    
280    sub sendMessages {
281            my $self=shift;
282            
283            my $numMessages=0;
284            my $msgTemplate=$self->getConf('MessageTemplate');
285            my $msgData='';
286            
287            foreach my $message (@_)
288            {
289                    $msgData.=$message->messageWithTemplate($msgTemplate);
290                    $numMessages++;
291            }
292            
293            return if($numMessages<1);
294            
295            $self->write($msgData);
296            
297            $::Statistics->{'messages_served'}+=$numMessages;
298            
299            my $msgCount=$self->{'messageCount'};
300            $msgCount+=$numMessages;
301            $self->{'messageCount'}=$msgCount;
302            
303            # If long polling, close connection, as a message has now been sent.
304            # Don't close if still processing the header (we may be sending a backlog from multiple channels)
305            if($self->getConf('CloseOnEvent') && !exists($self->{'headerBuffer'})) {
306                    $self->close(1, 'closedOnEvent');
307            }
308    }
309    
310    sub ping {
311            my $self=shift;
312            my $msg=$self->getConf('PingMessage');
313            
314            $self->write($msg);
315    }
316    
317    sub closeChannel {
318            my $self=shift;
319            my $channelName=shift;
320            
321            return unless(exists($self->{'channels'}->{$channelName}));
322            
323            my $channel=$self->{'channels'}->{$channelName};
324            $channel->removeSubscriber($self,'channelClose');
325            
326            delete($self->{'channels'}->{$channelName});
327            
328            $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
329    }
330    
331    sub close {
332            my $self=shift;
333            my $noShutdownMsg=shift;
334            my $reason=shift;
335            
336            foreach my $channelName (keys %{$self->{'channels'}})
337            {
338                    my $channel=$self->{'channels'}->{$channelName};
339                    $channel->removeSubscriber($self,$reason);
340            }
341            delete($self->{'channels'});
342            
343            # If this connection is in the PersistentConnections array, delete it, then anonymise
344            # it so that if we have to wait for the write buffer to empty before close, it's only
345            # removed once.
346            if(exists($self->{'subscriberID'})) {
347                    delete($PersistentConnections{$self->{'subscriberID'}});
348                    delete($self->{'subscriberID'});
349            }
350            
351            # Send shutdown message unless remote closed or
352            # connection not yet established
353            unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
354            {
355                    my $msg=$self->getConf('SubscriberShutdownMsg');
356                    if(defined($msg) && $msg ne '')
357                    {
358                            $self->write($msg);
359                    }
360            }
361    
362            my $fmsg=$self->getConf('FooterTemplate');
363            if(defined($fmsg) && $fmsg ne '')
364            {
365                    $self->write($fmsg);
366            }
367    
368            $self->SUPER::close();
369    }
370    
371    sub didClose {
372            
373            $::Statistics->{'current_subscribers'}--;
374    }
375    
376    sub checkForMaxTime {
377            my $self=shift;
378            my $time=shift;
379            
380            $self->close(1,'maxTime') if(exists($self->{'connectionTimeLimit'}) && $self->{'connectionTimeLimit'}<$time);
381    }
382    
383    sub getConf {
384            my $self=shift;
385            my $key=shift;
386            
387            if(exists($self->{'mode'}) && $self->{'mode'} ne '') {
388                    my $k=$key.$self->{'mode'};            
389                    if(exists($::CONF{$k})) {
390                            return $::CONF{$k};
391                    }
392            }
393            
394            $::CONF{$key};
395    }
396    
397    1;
398    ############################################################################EOF

Legend:
Removed from v.9  
changed lines
  Added in v.70

  ViewVC Help
Powered by ViewVC 1.1.26