/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 50 - (show annotations)
Wed Feb 27 13:55:35 2008 UTC (16 years, 1 month ago) by andrew.betts
File size: 9288 byte(s)
Added crossdomain.xml for flash clients
Incremented version number
Moved 'new message' debug notice to more useful location
Moved default for ChannelInfoTemplate to correct position alphabetically in code
Set simpler default HeaderTemplate
Added LogTimeFormat
Updated description of PingInterval, Persist
Corrected misspelling of Parameter
Reformatted debug output for config initialisation
Added recognition of null byte in config
Fixed problem with mode recognition
Fixed resuming from given message ID
Fixed sending of message backlog
Fixed Shlemiels
Logged connection duration on leavechannel
Fixed name support in channelinfotemplate
Added logging of reasons for connection closes
Abbreviated log output
Fixed tracking of subscriber IDs
Added logging of user agent
Fixed incorrect key for MessageTemplate in Subscriber.pm
Add some additional code comments
Fixed incorrect closure of new connection if previous connection close was waiting on write buffer

1 #!/usr/bin/perl -w
2 ###############################################################################
3 # Meteor
4 # An HTTP server for the 2.0 web
5 # Copyright (c) 2006 contributing authors
6 #
7 # Subscriber.pm
8 #
9 # Description:
10 # A Meteor Subscriber
11 #
12 ###############################################################################
13 #
14 # This program is free software; you can redistribute it and/or modify it
15 # under the terms of the GNU General Public License as published by the Free
16 # Software Foundation; either version 2 of the License, or (at your option)
17 # any later version.
18 #
19 # This program is distributed in the hope that it will be useful, but WITHOUT
20 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22 # more details.
23 #
24 # You should have received a copy of the GNU General Public License along
25 # with this program; if not, write to the Free Software Foundation, Inc.,
26 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #
28 # For more information visit www.meteorserver.org
29 #
30 ###############################################################################
31
32 package Meteor::Subscriber;
33 ###############################################################################
34 # Configuration
35 ###############################################################################
36
37 use strict;
38
39 use Meteor::Connection;
40 use Meteor::Channel;
41 use Meteor::Document;
42
43 @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44
45 our %PersistentConnections=();
46 our $NumAcceptedConnections=0;
47
48
49 ###############################################################################
50 # Factory methods
51 ###############################################################################
52 sub newFromServer {
53 my $class=shift;
54
55 my $self=$class->SUPER::newFromServer(shift);
56
57 $self->{'headerBuffer'}='';
58 $self->{'MessageCount'}=0;
59 $self->{'MaxMessageCount'}=0;
60
61 $self->{'ConnectionStart'}=time;
62 my $maxTime=$::CONF{'MaxTime'};
63 if($maxTime>0)
64 {
65 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
66 }
67
68 $::Statistics->{'current_subscribers'}++;
69 $::Statistics->{'subscriber_connections_accepted'}++;
70
71 $self;
72 }
73
74 ###############################################################################
75 # Class methods
76 ###############################################################################
77 sub deleteSubscriberWithID {
78 my $class=shift;
79 my $id=shift;
80
81 if(exists($PersistentConnections{$id}))
82 {
83 $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
84 }
85 }
86
87 sub pingPersistentConnections {
88 my $class=shift;
89
90 my @cons=values %PersistentConnections;
91
92 map { $_->ping() } @cons;
93 }
94
95 sub checkPersistentConnectionsForMaxTime {
96 my $class=shift;
97
98 my $time=time;
99 my @cons=values %PersistentConnections;
100
101 map { $_->checkForMaxTime($time) } @cons;
102 }
103
104 sub numSubscribers {
105
106 return scalar(keys %PersistentConnections);
107 }
108
109 ###############################################################################
110 # Instance methods
111 ###############################################################################
112 sub processLine {
113 my $self=shift;
114 my $line=shift;
115
116 # Once the header was processed we ignore any input
117 return unless(exists($self->{'headerBuffer'}));
118
119 if($line ne '')
120 {
121 #
122 # Accumulate header
123 #
124 $self->{'headerBuffer'}.="$line\n";
125 }
126 else
127 {
128 #
129 # Empty line signals end of header.
130 # Analyze header, register with appropiate channel
131 # and send pending messages.
132 #
133 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
134 #
135 # Find the 'GET' line
136 #
137 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
138 {
139 $self->{'subscriberID'}=$1;
140 $self->{'mode'}=$2;
141 my $persist=$self->getConf('Persist');
142 my $maxTime=$self->getConf('MaxTime');
143 $self->{'ConnectionTimeLimit'} = ($self->{'ConnectionStart'}+$maxTime) if ($maxTime>0);
144
145 my @channelData=split('/',$3);
146 my $channels={};
147 my $channelName;
148 my $offset;
149 foreach my $chandef (@channelData) {
150 if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {
151 $channelName = $1;
152 $channels->{$channelName}->{'startIndex'} = undef;
153 if ($3) {
154 $offset = $4;
155 if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
156 if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
157 if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
158 }
159 }
160 }
161 my $useragent = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
162
163 delete($self->{'headerBuffer'});
164
165 if ($persist) {
166 $self->deleteSubscriberWithID($self->{'subscriberID'});
167 $PersistentConnections{$self->{'subscriberID'}}=$self;
168 }
169
170 if(scalar(keys %{$channels}))
171 {
172 $self->emitOKHeader();
173 $self->setChannels($channels,$persist,$self->{'mode'},$useragent);
174 $self->close(1, 'responseComplete') unless($persist);
175 return;
176 }
177 }
178 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
179 {
180 $self->deleteSubscriberWithID($1);
181 $self->emitOKHeader();
182 $self->close(1, 'disconnectRequested');
183 return;
184 }
185 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
186 {
187 Meteor::Document->serveFileToClient($1,$self);
188 $self->close(1, 'responseComplete');
189 return;
190 }
191
192 #
193 # If we fall through we did not understand the request
194 #
195 $self->emitErrorHeader();
196 }
197 }
198
199 sub setChannels {
200 my $self=shift;
201 my $channels=shift;
202 my $persist=shift;
203 my $mode=shift || '';
204 my $userAgent=shift || '';
205
206 foreach my $channelName (keys %{$channels})
207 {
208 my $startIndex=$channels->{$channelName}->{'startIndex'};
209
210 my $channel=Meteor::Channel->channelWithName($channelName);
211
212 $self->{'channels'}->{$channelName}=$channel if($persist);
213
214 $channel->addSubscriber($self,$startIndex,$persist,$mode,$userAgent);
215 }
216 }
217
218 sub emitOKHeader {
219 my $self=shift;
220
221 $self->emitHeader('200 OK');
222 }
223
224 sub emitErrorHeader {
225 my $self=shift;
226
227 $self->emitHeader('404 Not Found');
228 $::Statistics->{'errors_served'}++;
229
230 # close up shop here!
231 $self->close(0, 'error');
232 }
233
234 sub emitHeader {
235 my $self=shift;
236 my $status=shift;
237
238 my $header=$self->getConf('HeaderTemplate');
239
240 $header=~s/~([^~]*)~/
241 if(!defined($1) || $1 eq '')
242 {
243 '~';
244 }
245 elsif($1 eq 'server')
246 {
247 $::PGM;
248 }
249 elsif($1 eq 'status')
250 {
251 $status;
252 }
253 elsif($1 eq 'servertime')
254 {
255 time;
256 }
257 elsif($1 eq 'channelinfo')
258 {
259 Meteor::Channel->listChannelsUsingTemplate($self->getConf('ChannelInfoTemplate'));
260 }
261 else
262 {
263 '';
264 }
265 /gex;
266
267 $self->write($header);
268 }
269
270 sub sendMessages {
271 my $self=shift;
272
273 my $numMessages=0;
274 my $msgTemplate=$self->getConf('MessageTemplate');
275 my $msgData='';
276
277 foreach my $message (@_)
278 {
279 $msgData.=$message->messageWithTemplate($msgTemplate);
280 $numMessages++;
281 }
282
283 return if($numMessages<1);
284
285 $self->write($msgData);
286
287 $::Statistics->{'messages_served'}+=$numMessages;
288
289 my $msgCount=$self->{'MessageCount'};
290 $msgCount+=$numMessages;
291 $self->{'MessageCount'}=$msgCount;
292
293 my $maxMsg=$self->getConf('MaxMessages');
294 if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
295 {
296 $self->close(1, 'maxMessageCountReached');
297 }
298
299 if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
300 {
301 $self->close(1, 'maxMessageCountReached');
302 }
303
304 }
305
306 sub ping {
307 my $self=shift;
308 my $msg=$self->getConf('PingMessage');
309
310 $self->write($msg);
311 }
312
313 sub closeChannel {
314 my $self=shift;
315 my $channelName=shift;
316
317 return unless(exists($self->{'channels'}->{$channelName}));
318
319 my $channel=$self->{'channels'}->{$channelName};
320 $channel->removeSubscriber($self,'channelClose');
321
322 delete($self->{'channels'}->{$channelName});
323
324 $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
325 }
326
327 sub close {
328 my $self=shift;
329 my $noShutdownMsg=shift;
330 my $reason=shift;
331
332 foreach my $channelName (keys %{$self->{'channels'}})
333 {
334 my $channel=$self->{'channels'}->{$channelName};
335 $channel->removeSubscriber($self,$reason);
336 }
337 delete($self->{'channels'});
338
339 # If this connection is in the PersistentConnections array, delete it, then anonymise
340 # it so that if we have to wait for the write buffer to empty before close, it's only
341 # removed once.
342 if(exists($self->{'subscriberID'})) {
343 delete($PersistentConnections{$self->{'subscriberID'}});
344 delete($self->{'subscriberID'});
345 }
346
347 # Send shutdown message unless remote closed or
348 # connection not yet established
349 unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
350 {
351 my $msg=$self->getConf('SubscriberShutdownMsg');
352 if(defined($msg) && $msg ne '')
353 {
354 $self->write($msg);
355 }
356 }
357
358 $self->SUPER::close();
359 }
360
361 sub didClose {
362
363 $::Statistics->{'current_subscribers'}--;
364 }
365
366 sub checkForMaxTime {
367 my $self=shift;
368 my $time=shift;
369
370 $self->close(1,'maxTime') if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
371 }
372
373 sub getConf {
374 my $self=shift;
375 my $key=shift;
376
377 if(exists($self->{'mode'}) && $self->{'mode'} ne '')
378 {
379 my $k=$key.$self->{'mode'};
380
381 if(exists($::CONF{$k})) {
382 return $::CONF{$k};
383 }
384 }
385
386 $::CONF{$key};
387 }
388
389 1;
390 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26