/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 55 - (hide annotations)
Thu Feb 28 00:21:39 2008 UTC (16 years, 1 month ago) by andrew.betts
File size: 9334 byte(s)
Fixed: Made eof() work properly
Added nocache querystring param and modified Subscriber.pm to allow it
Rewrote stream.html to ensure we capture the last chunk of data before a reset
Incremented version

1 knops.gerd 11 #!/usr/bin/perl -w
2     ###############################################################################
3     # Meteor
4     # An HTTP server for the 2.0 web
5     # Copyright (c) 2006 contributing authors
6     #
7     # Subscriber.pm
8     #
9     # Description:
10     # A Meteor Subscriber
11     #
12     ###############################################################################
13     #
14     # This program is free software; you can redistribute it and/or modify it
15     # under the terms of the GNU General Public License as published by the Free
16     # Software Foundation; either version 2 of the License, or (at your option)
17     # any later version.
18     #
19     # This program is distributed in the hope that it will be useful, but WITHOUT
20     # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21     # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22     # more details.
23     #
24     # You should have received a copy of the GNU General Public License along
25     # with this program; if not, write to the Free Software Foundation, Inc.,
26     # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27     #
28     # For more information visit www.meteorserver.org
29     #
30     ###############################################################################
31    
32     package Meteor::Subscriber;
33     ###############################################################################
34     # Configuration
35     ###############################################################################
36    
37     use strict;
38    
39     use Meteor::Connection;
40     use Meteor::Channel;
41     use Meteor::Document;
42    
43     @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44    
45     our %PersistentConnections=();
46 knops.gerd 25 our $NumAcceptedConnections=0;
47 knops.gerd 37
48 knops.gerd 11
49     ###############################################################################
50     # Factory methods
51     ###############################################################################
52     sub newFromServer {
53     my $class=shift;
54    
55     my $self=$class->SUPER::newFromServer(shift);
56    
57     $self->{'headerBuffer'}='';
58     $self->{'MessageCount'}=0;
59     $self->{'MaxMessageCount'}=0;
60    
61     $self->{'ConnectionStart'}=time;
62     my $maxTime=$::CONF{'MaxTime'};
63     if($maxTime>0)
64     {
65     $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
66     }
67    
68 knops.gerd 25 $::Statistics->{'current_subscribers'}++;
69     $::Statistics->{'subscriber_connections_accepted'}++;
70    
71 knops.gerd 11 $self;
72     }
73    
74     ###############################################################################
75     # Class methods
76     ###############################################################################
77     sub deleteSubscriberWithID {
78     my $class=shift;
79     my $id=shift;
80    
81     if(exists($PersistentConnections{$id}))
82     {
83 andrew.betts 50 $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
84 knops.gerd 11 }
85     }
86    
87     sub pingPersistentConnections {
88     my $class=shift;
89    
90     my @cons=values %PersistentConnections;
91    
92 knops.gerd 45 map { $_->ping() } @cons;
93 knops.gerd 11 }
94    
95     sub checkPersistentConnectionsForMaxTime {
96     my $class=shift;
97    
98     my $time=time;
99     my @cons=values %PersistentConnections;
100    
101     map { $_->checkForMaxTime($time) } @cons;
102     }
103    
104 knops.gerd 25 sub numSubscribers {
105    
106     return scalar(keys %PersistentConnections);
107     }
108    
109 knops.gerd 11 ###############################################################################
110     # Instance methods
111     ###############################################################################
112     sub processLine {
113     my $self=shift;
114     my $line=shift;
115    
116     # Once the header was processed we ignore any input
117     return unless(exists($self->{'headerBuffer'}));
118    
119     if($line ne '')
120     {
121     #
122     # Accumulate header
123     #
124     $self->{'headerBuffer'}.="$line\n";
125     }
126     else
127     {
128     #
129     # Empty line signals end of header.
130     # Analyze header, register with appropiate channel
131     # and send pending messages.
132     #
133 andrew.betts 32 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
134 knops.gerd 11 #
135     # Find the 'GET' line
136     #
137 andrew.betts 55 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.]+).*?/i)
138 knops.gerd 11 {
139 andrew.betts 50 $self->{'subscriberID'}=$1;
140 andrew.betts 32 $self->{'mode'}=$2;
141 knops.gerd 45 my $persist=$self->getConf('Persist');
142     my $maxTime=$self->getConf('MaxTime');
143 andrew.betts 50 $self->{'ConnectionTimeLimit'} = ($self->{'ConnectionStart'}+$maxTime) if ($maxTime>0);
144 knops.gerd 45
145 andrew.betts 32 my @channelData=split('/',$3);
146 knops.gerd 13 my $channels={};
147 andrew.betts 32 my $channelName;
148     my $offset;
149 andrew.betts 35 foreach my $chandef (@channelData) {
150 andrew.betts 51 if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
151 andrew.betts 32 $channelName = $1;
152     $channels->{$channelName}->{'startIndex'} = undef;
153 andrew.betts 35 if ($3) {
154     $offset = $4;
155     if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
156     if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
157     if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
158 knops.gerd 13 }
159 knops.gerd 11 }
160     }
161 andrew.betts 50 my $useragent = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
162 knops.gerd 11
163 knops.gerd 13 delete($self->{'headerBuffer'});
164    
165 andrew.betts 50 if ($persist) {
166     $self->deleteSubscriberWithID($self->{'subscriberID'});
167     $PersistentConnections{$self->{'subscriberID'}}=$self;
168 knops.gerd 11 }
169    
170 andrew.betts 53 if(scalar(keys %{$channels})) {
171    
172     $self->{'channelinfo'} = '';
173     my $citemplate = $self->getConf('ChannelInfoTemplate');
174     foreach $channelName (keys %{$channels}) {
175     my $channel=Meteor::Channel->channelWithName($channelName);
176     $self->{'channels'}->{$channelName}=$channel;
177     $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
178    
179     }
180 knops.gerd 11 $self->emitOKHeader();
181 andrew.betts 53 foreach $channelName (keys %{$channels}) {
182     my $startIndex=$channels->{$channelName}->{'startIndex'};
183     $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$useragent);
184     }
185     delete ($self->{'channels'}) unless($persist);
186 andrew.betts 50 $self->close(1, 'responseComplete') unless($persist);
187 knops.gerd 11 return;
188     }
189     }
190 andrew.betts 32 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
191     {
192     $self->deleteSubscriberWithID($1);
193     $self->emitOKHeader();
194 andrew.betts 50 $self->close(1, 'disconnectRequested');
195 andrew.betts 32 return;
196     }
197 knops.gerd 11 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
198     {
199     Meteor::Document->serveFileToClient($1,$self);
200 andrew.betts 50 $self->close(1, 'responseComplete');
201 knops.gerd 11 return;
202     }
203    
204     #
205     # If we fall through we did not understand the request
206     #
207     $self->emitErrorHeader();
208     }
209     }
210    
211     sub emitOKHeader {
212     my $self=shift;
213    
214     $self->emitHeader('200 OK');
215     }
216    
217     sub emitErrorHeader {
218     my $self=shift;
219    
220     $self->emitHeader('404 Not Found');
221 knops.gerd 25 $::Statistics->{'errors_served'}++;
222 knops.gerd 11
223     # close up shop here!
224 andrew.betts 50 $self->close(0, 'error');
225 knops.gerd 11 }
226    
227     sub emitHeader {
228     my $self=shift;
229     my $status=shift;
230    
231 knops.gerd 49 my $header=$self->getConf('HeaderTemplate');
232 knops.gerd 11
233 knops.gerd 45 $header=~s/~([^~]*)~/
234 andrew.betts 53 if(!defined($1) || $1 eq '') {
235 knops.gerd 11 '~';
236 andrew.betts 53 } elsif($1 eq 'server') {
237 knops.gerd 11 $::PGM;
238 andrew.betts 53 } elsif($1 eq 'status') {
239 knops.gerd 11 $status;
240 andrew.betts 53 } elsif($1 eq 'servertime') {
241 knops.gerd 11 time;
242 andrew.betts 53 } elsif($1 eq 'channelinfo') {
243     $self->{'channelinfo'};
244     } else {
245 knops.gerd 11 '';
246     }
247     /gex;
248    
249 andrew.betts 39 $self->write($header);
250 knops.gerd 11 }
251    
252 knops.gerd 45 sub sendMessages {
253 knops.gerd 11 my $self=shift;
254    
255 knops.gerd 45 my $numMessages=0;
256 andrew.betts 50 my $msgTemplate=$self->getConf('MessageTemplate');
257 knops.gerd 45 my $msgData='';
258 knops.gerd 25
259 knops.gerd 45 foreach my $message (@_)
260     {
261     $msgData.=$message->messageWithTemplate($msgTemplate);
262     $numMessages++;
263     }
264 knops.gerd 11
265 knops.gerd 45 return if($numMessages<1);
266 knops.gerd 25
267 knops.gerd 45 $self->write($msgData);
268 knops.gerd 11
269 knops.gerd 45 $::Statistics->{'messages_served'}+=$numMessages;
270    
271     my $msgCount=$self->{'MessageCount'};
272     $msgCount+=$numMessages;
273     $self->{'MessageCount'}=$msgCount;
274    
275     my $maxMsg=$self->getConf('MaxMessages');
276 knops.gerd 11 if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
277     {
278 andrew.betts 50 $self->close(1, 'maxMessageCountReached');
279 knops.gerd 11 }
280    
281     if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
282     {
283 andrew.betts 50 $self->close(1, 'maxMessageCountReached');
284 knops.gerd 11 }
285 knops.gerd 45
286 knops.gerd 11 }
287    
288 knops.gerd 45 sub ping {
289     my $self=shift;
290     my $msg=$self->getConf('PingMessage');
291    
292     $self->write($msg);
293     }
294    
295 knops.gerd 13 sub closeChannel {
296     my $self=shift;
297     my $channelName=shift;
298    
299     return unless(exists($self->{'channels'}->{$channelName}));
300    
301     my $channel=$self->{'channels'}->{$channelName};
302 knops.gerd 47 $channel->removeSubscriber($self,'channelClose');
303 knops.gerd 13
304     delete($self->{'channels'}->{$channelName});
305    
306 andrew.betts 50 $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
307 knops.gerd 13 }
308    
309 knops.gerd 11 sub close {
310     my $self=shift;
311     my $noShutdownMsg=shift;
312 andrew.betts 50 my $reason=shift;
313 knops.gerd 11
314 knops.gerd 13 foreach my $channelName (keys %{$self->{'channels'}})
315     {
316     my $channel=$self->{'channels'}->{$channelName};
317 andrew.betts 50 $channel->removeSubscriber($self,$reason);
318 knops.gerd 13 }
319     delete($self->{'channels'});
320 knops.gerd 11
321 andrew.betts 50 # If this connection is in the PersistentConnections array, delete it, then anonymise
322     # it so that if we have to wait for the write buffer to empty before close, it's only
323     # removed once.
324     if(exists($self->{'subscriberID'})) {
325 knops.gerd 11 delete($PersistentConnections{$self->{'subscriberID'}});
326 andrew.betts 50 delete($self->{'subscriberID'});
327 knops.gerd 11 }
328    
329     # Send shutdown message unless remote closed or
330     # connection not yet established
331     unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
332     {
333 knops.gerd 45 my $msg=$self->getConf('SubscriberShutdownMsg');
334 knops.gerd 11 if(defined($msg) && $msg ne '')
335     {
336     $self->write($msg);
337     }
338     }
339    
340     $self->SUPER::close();
341     }
342    
343 knops.gerd 37 sub didClose {
344    
345     $::Statistics->{'current_subscribers'}--;
346     }
347    
348 knops.gerd 11 sub checkForMaxTime {
349     my $self=shift;
350     my $time=shift;
351    
352 andrew.betts 50 $self->close(1,'maxTime') if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
353 knops.gerd 11 }
354    
355 knops.gerd 45 sub getConf {
356     my $self=shift;
357     my $key=shift;
358    
359     if(exists($self->{'mode'}) && $self->{'mode'} ne '')
360     {
361 andrew.betts 50 my $k=$key.$self->{'mode'};
362 knops.gerd 45
363 andrew.betts 50 if(exists($::CONF{$k})) {
364     return $::CONF{$k};
365     }
366 knops.gerd 45 }
367    
368     $::CONF{$key};
369     }
370    
371 knops.gerd 11 1;
372 andrew.betts 3 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26