/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 64 - (show annotations)
Mon Jan 19 11:19:41 2009 UTC (15 years, 2 months ago) by andrew.betts
File size: 10308 byte(s)
Release 1.06.04 as documented in Google Group

1 #!/usr/bin/perl -w
2 ###############################################################################
3 # Meteor
4 # An HTTP server for the 2.0 web
5 # Copyright (c) 2006 contributing authors
6 #
7 # Subscriber.pm
8 #
9 # Description:
10 # A Meteor Subscriber
11 #
12 ###############################################################################
13 #
14 # This program is free software; you can redistribute it and/or modify it
15 # under the terms of the GNU General Public License as published by the Free
16 # Software Foundation; either version 2 of the License, or (at your option)
17 # any later version.
18 #
19 # This program is distributed in the hope that it will be useful, but WITHOUT
20 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22 # more details.
23 #
24 # You should have received a copy of the GNU General Public License along
25 # with this program; if not, write to the Free Software Foundation, Inc.,
26 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #
28 # For more information visit www.meteorserver.org
29 #
30 ###############################################################################
31
32 package Meteor::Subscriber;
33 ###############################################################################
34 # Configuration
35 ###############################################################################
36
37 use strict;
38
39 use Meteor::Connection;
40 use Meteor::Channel;
41 use Meteor::Document;
42
43 @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44
45 our %PersistentConnections=();
46 our $NumAcceptedConnections=0;
47
48
49 ###############################################################################
50 # Factory methods
51 ###############################################################################
52 sub newFromServer {
53 my $class=shift;
54
55 my $self=$class->SUPER::newFromServer(shift);
56
57 $self->{'headerBuffer'}='';
58 $self->{'messageCount'}=0;
59
60 $::Statistics->{'current_subscribers'}++;
61 $::Statistics->{'subscriber_connections_accepted'}++;
62
63 $self;
64 }
65
66 ###############################################################################
67 # Class methods
68 ###############################################################################
69 sub deleteSubscriberWithID {
70 my $class=shift;
71 my $id=shift;
72
73 if(exists($PersistentConnections{$id}))
74 {
75 $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
76 }
77 }
78
79 sub subscriberExists {
80 my $class=shift;
81 my $id=shift;
82
83 return exists($PersistentConnections{$id});
84 }
85
86 sub pingPersistentConnections {
87 my $class=shift;
88
89 my @cons=values %PersistentConnections;
90
91 map { $_->ping() } @cons;
92 }
93
94 sub checkPersistentConnectionsForMaxTime {
95 my $class=shift;
96
97 my $time=time;
98 my @cons=values %PersistentConnections;
99
100 map { $_->checkForMaxTime($time) } @cons;
101 }
102
103 sub numSubscribers {
104
105 return scalar(keys %PersistentConnections);
106 }
107
108 sub listSubscribers {
109 my $class=shift;
110 my $list='';
111 foreach my $subscriber (keys %PersistentConnections)
112 {
113 my $sub = $PersistentConnections{$subscriber};
114 $list .= $subscriber.' '.$sub->{'ip'}.' '.$sub->{'connectionStart'}.' '.$sub->{'connectionTimeLimit'}.' '.($sub->{'connectionTimeLimit'}-time).' '.$sub->{'messageCount'}.' "'.$sub->{'useragent'}."\"$::CRLF";
115 }
116 $list;
117 }
118
119 ###############################################################################
120 # Instance methods
121 ###############################################################################
122 sub processLine {
123 my $self=shift;
124 my $line=shift;
125
126 # Once the header was processed we ignore any input - Meteor does not accept or process request bodies
127 return unless(exists($self->{'headerBuffer'}));
128
129 if($line ne '')
130 {
131 #
132 # Accumulate header
133 #
134 $self->{'headerBuffer'}.="$line\n";
135 }
136 else
137 {
138 #
139 # Empty line signals end of header.
140 # Analyze header, register with appropiate channel
141 # and send pending messages.
142 #
143 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
144 #
145 # Find the 'GET' line
146 #
147 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i)
148 {
149 $self->{'subscriberID'}=$1;
150 $self->{'mode'}=$2;
151 my $persist=$self->getConf('Persist');
152 my $maxTime=$self->getConf('MaxTime');
153 $self->{'connectionTimeLimit'} = ($self->{'connectionStart'}+$maxTime) if ($maxTime>0);
154
155 my @channelData=split('/',$3);
156 my $channels={};
157 my $channelName;
158 my $offset;
159 foreach my $chandef (@channelData) {
160 if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
161 $channelName = $1;
162 $channels->{$channelName}->{'startIndex'} = undef;
163 if ($3) {
164 $offset = $4;
165 if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
166 if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
167 if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
168 }
169 }
170 }
171 $self->{'useragent'} = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
172
173 if ($persist) {
174 # New persistent connection: kill any existing connection with same ID
175 $self->deleteSubscriberWithID($self->{'subscriberID'});
176 # Add new persistent connection to collection
177 $PersistentConnections{$self->{'subscriberID'}}=$self;
178 } else {
179 $::Pollers->{$self->{'subscriberID'}} = time;
180 }
181
182 if(scalar(keys %{$channels})) {
183
184 $self->{'channelinfo'} = '';
185 my $citemplate = $self->getConf('ChannelInfoTemplate');
186 foreach $channelName (keys %{$channels}) {
187 my $channel=Meteor::Channel->channelWithName($channelName);
188 $self->{'channels'}->{$channelName}=$channel;
189 if (defined($self->{'channels'}->{$channelName}->{'startIndex'}) && $self->{'channels'}->{$channelName}->{'startIndex'} > 0) {
190 $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
191 }
192 }
193 $self->emitOKHeader();
194 foreach $channelName (keys %{$channels}) {
195 my $startIndex=$channels->{$channelName}->{'startIndex'};
196 $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$self->{'useragent'});
197 }
198 if (!$persist) {
199 delete ($self->{'channels'});
200 $self->close(1, 'responseComplete');
201 }
202 delete($self->{'headerBuffer'});
203
204 # If long polling, close connection immediately if any messages have been sent
205 if ($self->{'messageCount'} > 0 && $self->{'mode'} eq 'longpoll') {
206 $self->close(1, 'closedOnEvent');
207 }
208 return;
209 }
210 }
211 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
212 {
213 $self->deleteSubscriberWithID($1);
214 $self->emitOKHeader();
215 $self->close(1, 'disconnectRequested');
216 return;
217 }
218 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
219 {
220 Meteor::Document->serveFileToClient($1,$self);
221 $self->SUPER::close();
222 return;
223 }
224
225 #
226 # If we fall through we did not understand the request
227 #
228 $self->emitErrorHeader();
229 }
230 }
231
232 sub emitOKHeader {
233 my $self=shift;
234
235 $self->emitHeader('200 OK');
236 }
237
238 sub emitErrorHeader {
239 my $self=shift;
240
241 $self->emitHeader('404 Not Found');
242 $::Statistics->{'errors_served'}++;
243
244 # close up shop here!
245 $self->close(0, 'error');
246 }
247
248 sub emitHeader {
249 my $self=shift;
250 my $status=shift;
251
252 my $header=$self->getConf('HeaderTemplate');
253
254 $header=~s/~([^~]*)~/
255 if(!defined($1) || $1 eq '') {
256 '~';
257 } elsif($1 eq 'server') {
258 $::PGM;
259 } elsif($1 eq 'status') {
260 $status;
261 } elsif($1 eq 'servertime') {
262 time;
263 } elsif($1 eq 'channelinfo') {
264 $self->{'channelinfo'};
265 } else {
266 '';
267 }
268 /gex;
269
270 $self->write($header);
271 }
272
273 sub sendMessages {
274 my $self=shift;
275
276 my $numMessages=0;
277 my $msgTemplate=$self->getConf('MessageTemplate');
278 my $msgData='';
279
280 foreach my $message (@_)
281 {
282 $msgData.=$message->messageWithTemplate($msgTemplate);
283 $numMessages++;
284 }
285
286 return if($numMessages<1);
287
288 $self->write($msgData);
289
290 $::Statistics->{'messages_served'}+=$numMessages;
291
292 my $msgCount=$self->{'messageCount'};
293 $msgCount+=$numMessages;
294 $self->{'messageCount'}=$msgCount;
295
296 # If long polling, close connection, as a message has now been sent.
297 # Don't close if still processing the header (we may be sending a backlog from multiple channels)
298 if($self->getConf('CloseOnEvent') && !exists($self->{'headerBuffer'})) {
299 $self->close(1, 'closedOnEvent');
300 }
301 }
302
303 sub ping {
304 my $self=shift;
305 my $msg=$self->getConf('PingMessage');
306
307 $self->write($msg);
308 }
309
310 sub closeChannel {
311 my $self=shift;
312 my $channelName=shift;
313
314 return unless(exists($self->{'channels'}->{$channelName}));
315
316 my $channel=$self->{'channels'}->{$channelName};
317 $channel->removeSubscriber($self,'channelClose');
318
319 delete($self->{'channels'}->{$channelName});
320
321 $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
322 }
323
324 sub close {
325 my $self=shift;
326 my $noShutdownMsg=shift;
327 my $reason=shift;
328
329 foreach my $channelName (keys %{$self->{'channels'}})
330 {
331 my $channel=$self->{'channels'}->{$channelName};
332 $channel->removeSubscriber($self,$reason);
333 }
334 delete($self->{'channels'});
335
336 # If this connection is in the PersistentConnections array, delete it, then anonymise
337 # it so that if we have to wait for the write buffer to empty before close, it's only
338 # removed once.
339 if(exists($self->{'subscriberID'})) {
340 delete($PersistentConnections{$self->{'subscriberID'}});
341 delete($self->{'subscriberID'});
342 }
343
344 # Send shutdown message unless remote closed or
345 # connection not yet established
346 unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
347 {
348 my $msg=$self->getConf('SubscriberShutdownMsg');
349 if(defined($msg) && $msg ne '')
350 {
351 $self->write($msg);
352 }
353 }
354
355 my $fmsg=$self->getConf('FooterTemplate');
356 if(defined($fmsg) && $fmsg ne '')
357 {
358 $self->write($fmsg);
359 }
360
361 $self->SUPER::close();
362 }
363
364 sub didClose {
365
366 $::Statistics->{'current_subscribers'}--;
367 }
368
369 sub checkForMaxTime {
370 my $self=shift;
371 my $time=shift;
372
373 $self->close(1,'maxTime') if(exists($self->{'connectionTimeLimit'}) && $self->{'connectionTimeLimit'}<$time);
374 }
375
376 sub getConf {
377 my $self=shift;
378 my $key=shift;
379
380 if(exists($self->{'mode'}) && $self->{'mode'} ne '') {
381 my $k=$key.$self->{'mode'};
382 if(exists($::CONF{$k})) {
383 return $::CONF{$k};
384 }
385 }
386
387 $::CONF{$key};
388 }
389
390 1;
391 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26