/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 51 - (show annotations)
Wed Feb 27 14:05:59 2008 UTC (12 years, 4 months ago) by andrew.betts
File size: 9293 byte(s)
Updated default config file
Updated some config narrative
Channel names made more lenient

1 #!/usr/bin/perl -w
2 ###############################################################################
3 # Meteor
4 # An HTTP server for the 2.0 web
5 # Copyright (c) 2006 contributing authors
6 #
7 # Subscriber.pm
8 #
9 # Description:
10 # A Meteor Subscriber
11 #
12 ###############################################################################
13 #
14 # This program is free software; you can redistribute it and/or modify it
15 # under the terms of the GNU General Public License as published by the Free
16 # Software Foundation; either version 2 of the License, or (at your option)
17 # any later version.
18 #
19 # This program is distributed in the hope that it will be useful, but WITHOUT
20 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22 # more details.
23 #
24 # You should have received a copy of the GNU General Public License along
25 # with this program; if not, write to the Free Software Foundation, Inc.,
26 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #
28 # For more information visit www.meteorserver.org
29 #
30 ###############################################################################
31
32 package Meteor::Subscriber;
33 ###############################################################################
34 # Configuration
35 ###############################################################################
36
37 use strict;
38
39 use Meteor::Connection;
40 use Meteor::Channel;
41 use Meteor::Document;
42
43 @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44
45 our %PersistentConnections=();
46 our $NumAcceptedConnections=0;
47
48
49 ###############################################################################
50 # Factory methods
51 ###############################################################################
52 sub newFromServer {
53 my $class=shift;
54
55 my $self=$class->SUPER::newFromServer(shift);
56
57 $self->{'headerBuffer'}='';
58 $self->{'MessageCount'}=0;
59 $self->{'MaxMessageCount'}=0;
60
61 $self->{'ConnectionStart'}=time;
62 my $maxTime=$::CONF{'MaxTime'};
63 if($maxTime>0)
64 {
65 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
66 }
67
68 $::Statistics->{'current_subscribers'}++;
69 $::Statistics->{'subscriber_connections_accepted'}++;
70
71 $self;
72 }
73
74 ###############################################################################
75 # Class methods
76 ###############################################################################
77 sub deleteSubscriberWithID {
78 my $class=shift;
79 my $id=shift;
80
81 if(exists($PersistentConnections{$id}))
82 {
83 $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
84 }
85 }
86
87 sub pingPersistentConnections {
88 my $class=shift;
89
90 my @cons=values %PersistentConnections;
91
92 map { $_->ping() } @cons;
93 }
94
95 sub checkPersistentConnectionsForMaxTime {
96 my $class=shift;
97
98 my $time=time;
99 my @cons=values %PersistentConnections;
100
101 map { $_->checkForMaxTime($time) } @cons;
102 }
103
104 sub numSubscribers {
105
106 return scalar(keys %PersistentConnections);
107 }
108
109 ###############################################################################
110 # Instance methods
111 ###############################################################################
112 sub processLine {
113 my $self=shift;
114 my $line=shift;
115
116 # Once the header was processed we ignore any input
117 return unless(exists($self->{'headerBuffer'}));
118
119 if($line ne '')
120 {
121 #
122 # Accumulate header
123 #
124 $self->{'headerBuffer'}.="$line\n";
125 }
126 else
127 {
128 #
129 # Empty line signals end of header.
130 # Analyze header, register with appropiate channel
131 # and send pending messages.
132 #
133 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
134 #
135 # Find the 'GET' line
136 #
137 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
138 {
139 $self->{'subscriberID'}=$1;
140 $self->{'mode'}=$2;
141 my $persist=$self->getConf('Persist');
142 my $maxTime=$self->getConf('MaxTime');
143 $self->{'ConnectionTimeLimit'} = ($self->{'ConnectionStart'}+$maxTime) if ($maxTime>0);
144
145 my @channelData=split('/',$3);
146 my $channels={};
147 my $channelName;
148 my $offset;
149 foreach my $chandef (@channelData) {
150 if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
151 $channelName = $1;
152 $channels->{$channelName}->{'startIndex'} = undef;
153 if ($3) {
154 $offset = $4;
155 if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
156 if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
157 if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
158 }
159 }
160 }
161 my $useragent = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
162
163 delete($self->{'headerBuffer'});
164
165 if ($persist) {
166 $self->deleteSubscriberWithID($self->{'subscriberID'});
167 $PersistentConnections{$self->{'subscriberID'}}=$self;
168 }
169
170 if(scalar(keys %{$channels}))
171 {
172 $self->emitOKHeader();
173 $self->setChannels($channels,$persist,$self->{'mode'},$useragent);
174 $self->close(1, 'responseComplete') unless($persist);
175 return;
176 }
177 }
178 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
179 {
180 $self->deleteSubscriberWithID($1);
181 $self->emitOKHeader();
182 $self->close(1, 'disconnectRequested');
183 return;
184 }
185 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
186 {
187 Meteor::Document->serveFileToClient($1,$self);
188 $self->close(1, 'responseComplete');
189 return;
190 }
191
192 #
193 # If we fall through we did not understand the request
194 #
195 $self->emitErrorHeader();
196 }
197 }
198
199 sub setChannels {
200 my $self=shift;
201 my $channels=shift;
202 my $persist=shift;
203 my $mode=shift || '';
204 my $userAgent=shift || '';
205
206 foreach my $channelName (keys %{$channels})
207 {
208 my $startIndex=$channels->{$channelName}->{'startIndex'};
209
210 my $channel=Meteor::Channel->channelWithName($channelName);
211
212 $self->{'channels'}->{$channelName}=$channel if($persist);
213
214 $channel->addSubscriber($self,$startIndex,$persist,$mode,$userAgent);
215 }
216 }
217
218 sub emitOKHeader {
219 my $self=shift;
220
221 $self->emitHeader('200 OK');
222 }
223
224 sub emitErrorHeader {
225 my $self=shift;
226
227 $self->emitHeader('404 Not Found');
228 $::Statistics->{'errors_served'}++;
229
230 # close up shop here!
231 $self->close(0, 'error');
232 }
233
234 sub emitHeader {
235 my $self=shift;
236 my $status=shift;
237
238 my $header=$self->getConf('HeaderTemplate');
239
240 $header=~s/~([^~]*)~/
241 if(!defined($1) || $1 eq '')
242 {
243 '~';
244 }
245 elsif($1 eq 'server')
246 {
247 $::PGM;
248 }
249 elsif($1 eq 'status')
250 {
251 $status;
252 }
253 elsif($1 eq 'servertime')
254 {
255 time;
256 }
257 elsif($1 eq 'channelinfo')
258 {
259 Meteor::Channel->listChannelsUsingTemplate($self->getConf('ChannelInfoTemplate'));
260 }
261 else
262 {
263 '';
264 }
265 /gex;
266
267 $self->write($header);
268 }
269
270 sub sendMessages {
271 my $self=shift;
272
273 my $numMessages=0;
274 my $msgTemplate=$self->getConf('MessageTemplate');
275 my $msgData='';
276
277 foreach my $message (@_)
278 {
279 $msgData.=$message->messageWithTemplate($msgTemplate);
280 $numMessages++;
281 }
282
283 return if($numMessages<1);
284
285 $self->write($msgData);
286
287 $::Statistics->{'messages_served'}+=$numMessages;
288
289 my $msgCount=$self->{'MessageCount'};
290 $msgCount+=$numMessages;
291 $self->{'MessageCount'}=$msgCount;
292
293 my $maxMsg=$self->getConf('MaxMessages');
294 if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
295 {
296 $self->close(1, 'maxMessageCountReached');
297 }
298
299 if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
300 {
301 $self->close(1, 'maxMessageCountReached');
302 }
303
304 }
305
306 sub ping {
307 my $self=shift;
308 my $msg=$self->getConf('PingMessage');
309
310 $self->write($msg);
311 }
312
313 sub closeChannel {
314 my $self=shift;
315 my $channelName=shift;
316
317 return unless(exists($self->{'channels'}->{$channelName}));
318
319 my $channel=$self->{'channels'}->{$channelName};
320 $channel->removeSubscriber($self,'channelClose');
321
322 delete($self->{'channels'}->{$channelName});
323
324 $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
325 }
326
327 sub close {
328 my $self=shift;
329 my $noShutdownMsg=shift;
330 my $reason=shift;
331
332 foreach my $channelName (keys %{$self->{'channels'}})
333 {
334 my $channel=$self->{'channels'}->{$channelName};
335 $channel->removeSubscriber($self,$reason);
336 }
337 delete($self->{'channels'});
338
339 # If this connection is in the PersistentConnections array, delete it, then anonymise
340 # it so that if we have to wait for the write buffer to empty before close, it's only
341 # removed once.
342 if(exists($self->{'subscriberID'})) {
343 delete($PersistentConnections{$self->{'subscriberID'}});
344 delete($self->{'subscriberID'});
345 }
346
347 # Send shutdown message unless remote closed or
348 # connection not yet established
349 unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
350 {
351 my $msg=$self->getConf('SubscriberShutdownMsg');
352 if(defined($msg) && $msg ne '')
353 {
354 $self->write($msg);
355 }
356 }
357
358 $self->SUPER::close();
359 }
360
361 sub didClose {
362
363 $::Statistics->{'current_subscribers'}--;
364 }
365
366 sub checkForMaxTime {
367 my $self=shift;
368 my $time=shift;
369
370 $self->close(1,'maxTime') if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
371 }
372
373 sub getConf {
374 my $self=shift;
375 my $key=shift;
376
377 if(exists($self->{'mode'}) && $self->{'mode'} ne '')
378 {
379 my $k=$key.$self->{'mode'};
380
381 if(exists($::CONF{$k})) {
382 return $::CONF{$k};
383 }
384 }
385
386 $::CONF{$key};
387 }
388
389 1;
390 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26