/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 25 - (show annotations)
Sun May 20 19:40:53 2007 UTC (13 years ago) by knops.gerd
File size: 9348 byte(s)
• Add simple statistics, available via new SHOWSTATS controller command

1 #!/usr/bin/perl -w
2 ###############################################################################
3 # Meteor
4 # An HTTP server for the 2.0 web
5 # Copyright (c) 2006 contributing authors
6 #
7 # Subscriber.pm
8 #
9 # Description:
10 # A Meteor Subscriber
11 #
12 ###############################################################################
13 #
14 # This program is free software; you can redistribute it and/or modify it
15 # under the terms of the GNU General Public License as published by the Free
16 # Software Foundation; either version 2 of the License, or (at your option)
17 # any later version.
18 #
19 # This program is distributed in the hope that it will be useful, but WITHOUT
20 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22 # more details.
23 #
24 # You should have received a copy of the GNU General Public License along
25 # with this program; if not, write to the Free Software Foundation, Inc.,
26 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #
28 # For more information visit www.meteorserver.org
29 #
30 ###############################################################################
31
32 package Meteor::Subscriber;
33 ###############################################################################
34 # Configuration
35 ###############################################################################
36
37 use strict;
38
39 use Meteor::Connection;
40 use Meteor::Channel;
41 use Meteor::Document;
42
43 @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44
45 our %PersistentConnections=();
46 our $NumAcceptedConnections=0;
47
48 ###############################################################################
49 # Factory methods
50 ###############################################################################
51 sub newFromServer {
52 my $class=shift;
53
54 my $self=$class->SUPER::newFromServer(shift);
55
56 $self->{'headerBuffer'}='';
57 $self->{'MessageCount'}=0;
58 $self->{'MaxMessageCount'}=0;
59
60 $self->{'ConnectionStart'}=time;
61 my $maxTime=$::CONF{'MaxTime'};
62 if($maxTime>0)
63 {
64 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
65 }
66
67 $::Statistics->{'current_subscribers'}++;
68 $::Statistics->{'subscriber_connections_accepted'}++;
69
70 $self;
71 }
72
73 ###############################################################################
74 # Class methods
75 ###############################################################################
76 sub deleteSubscriberWithID {
77 my $class=shift;
78 my $id=shift;
79
80 if(exists($PersistentConnections{$id}))
81 {
82 $PersistentConnections{$id}->close(1);
83 }
84 }
85
86 sub pingPersistentConnections {
87 my $class=shift;
88
89 my $msg=$::CONF{'PingMessage'};
90 my @cons=values %PersistentConnections;
91
92 map { $_->write($msg) } @cons;
93 }
94
95 sub checkPersistentConnectionsForMaxTime {
96 my $class=shift;
97
98 my $time=time;
99 my @cons=values %PersistentConnections;
100
101 map { $_->checkForMaxTime($time) } @cons;
102 }
103
104 sub numSubscribers {
105
106 return scalar(keys %PersistentConnections);
107 }
108
109 ###############################################################################
110 # Instance methods
111 ###############################################################################
112 sub processLine {
113 my $self=shift;
114 my $line=shift;
115
116 # Once the header was processed we ignore any input
117 return unless(exists($self->{'headerBuffer'}));
118
119 if($line ne '')
120 {
121 #
122 # Accumulate header
123 #
124 $self->{'headerBuffer'}.="$line\n";
125 }
126 else
127 {
128 #
129 # Empty line signals end of header.
130 # Analyze header, register with appropiate channel
131 # and send pending messages.
132 #
133 # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1
134 #
135 # Find the 'GET' line
136 #
137 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/)
138 {
139 my @formData=split('&',$1);
140 my $channelName=undef;
141 my $startIndex=undef;
142 my $backtrack=undef;
143 my $persist=1;
144 my $subscriberID=undef;
145 my $channels={};
146 foreach my $formElement (@formData)
147 {
148 if($formElement=~/^channel=(.+)$/)
149 {
150 if(defined($channelName))
151 {
152 if(defined($startIndex) && defined($backtrack))
153 {
154 $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");
155 $self->close();
156
157 return;
158 }
159
160 $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));
161 $channels->{$channelName}->{'startIndex'}=$startIndex;
162
163 $startIndex=undef;
164 $backtrack=undef;
165 }
166 $channelName=$1;
167 }
168 elsif($formElement=~/^restartfrom=(\d*)$/)
169 {
170 $startIndex=$1;
171 $startIndex='' unless(defined($startIndex));
172 }
173 elsif($formElement=~/^backtrack=(\d+)$/)
174 {
175 $backtrack=$1;
176 $backtrack=0 unless(defined($backtrack));
177 }
178 elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/)
179 {
180 $persist=0 if($1=~/(no|false|0)/i);
181 }
182 elsif($formElement=~/^id=(.+)$/)
183 {
184 $subscriberID=$1;
185 }
186 elsif($formElement=~/^maxmessages=(\d+)$/i)
187 {
188 $self->{'MaxMessageCount'}=$1;
189 }
190 elsif($formElement=~/^template=(\d+)$/i)
191 {
192 $self->{'HeaderTemplateNumber'}=$1;
193 }
194 elsif($formElement=~/^maxtime=(\d+)$/i)
195 {
196 my $clientRequest=$1;
197 my $serverDefault=$::CONF{'MaxTime'};
198
199 if($serverDefault==0 || $serverDefault>$clientRequest)
200 {
201 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest;
202 }
203 }
204 }
205
206 if(defined($channelName))
207 {
208 if(defined($startIndex) && defined($backtrack))
209 {
210 $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");
211 $self->close();
212
213 return;
214 }
215
216 $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));
217 $channels->{$channelName}->{'startIndex'}=$startIndex;
218 }
219
220 delete($self->{'headerBuffer'});
221
222 if(defined($subscriberID) && $persist)
223 {
224 $self->{'subscriberID'}=$subscriberID;
225 $self->deleteSubscriberWithID($subscriberID);
226 $PersistentConnections{$subscriberID}=$self;
227 }
228
229 if(scalar(keys %{$channels}))
230 {
231 $self->emitOKHeader();
232
233 $self->setChannels($channels,$persist);
234
235 $self->close(1) unless($persist);
236
237 return;
238 }
239 }
240 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
241 {
242 Meteor::Document->serveFileToClient($1,$self);
243
244 $self->close(1);
245
246 return;
247 }
248
249 #
250 # If we fall through we did not understand the request
251 #
252 $self->emitErrorHeader();
253 }
254 }
255
256 sub setChannels {
257 my $self=shift;
258 my $channels=shift;
259 my $persist=shift;
260
261 foreach my $channelName (keys %{$channels})
262 {
263 my $startIndex=$channels->{$channelName}->{'startIndex'};
264
265 my $channel=Meteor::Channel->channelWithName($channelName);
266
267 $self->{'channels'}->{$channelName}=$channel if($persist);
268
269 $channel->addSubscriber($self,$startIndex,$persist);
270 }
271 }
272
273 sub emitOKHeader {
274 my $self=shift;
275
276 $self->emitHeader('200 OK');
277 }
278
279 sub emitErrorHeader {
280 my $self=shift;
281
282 $self->emitHeader('404 Not Found');
283 $::Statistics->{'errors_served'}++;
284
285 # close up shop here!
286 $self->close();
287 }
288
289 sub emitHeader {
290 my $self=shift;
291 my $status=shift;
292
293 my $header=undef;
294 if(exists($self->{'HeaderTemplateNumber'}))
295 {
296 my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};
297
298 $header=$::CONF{$hn};
299 }
300 $header=$::CONF{'HeaderTemplate'} unless(defined($header));
301
302 $header=~s/~([^~]+)~/
303 if(!defined($1) || $1 eq '')
304 {
305 '~';
306 }
307 elsif($1 eq 'server')
308 {
309 $::PGM;
310 }
311 elsif($1 eq 'status')
312 {
313 $status;
314 }
315 elsif($1 eq 'servertime')
316 {
317 time;
318 }
319 else
320 {
321 '';
322 }
323 /gex;
324
325 $self->write($header);
326 }
327
328 sub sendMessage {
329 my $self=shift;
330 my $msg=shift;
331 my $numMsgInThisBatch=shift;
332
333 $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch));
334
335 $self->write($msg);
336
337 $::Statistics->{'messages_served'}+=$numMsgInThisBatch;
338
339 my $msgCount=++$self->{'MessageCount'};
340
341 my $maxMsg=$::CONF{'MaxMessages'};
342 if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
343 {
344 $self->close(1);
345 }
346
347 if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
348 {
349 $self->close(1);
350 }
351 }
352
353 sub closeChannel {
354 my $self=shift;
355 my $channelName=shift;
356
357 return unless(exists($self->{'channels'}->{$channelName}));
358
359 my $channel=$self->{'channels'}->{$channelName};
360 $channel->removeSubscriber($self);
361
362 delete($self->{'channels'}->{$channelName});
363
364 $self->close() if(scalar(keys %{$self->{'channels'}})==0);
365 }
366
367 sub close {
368 my $self=shift;
369 my $noShutdownMsg=shift;
370
371 foreach my $channelName (keys %{$self->{'channels'}})
372 {
373 my $channel=$self->{'channels'}->{$channelName};
374 $channel->removeSubscriber($self);
375 }
376 delete($self->{'channels'});
377
378 if(exists($self->{'subscriberID'}))
379 {
380 delete($PersistentConnections{$self->{'subscriberID'}});
381 }
382
383 #
384 # Send shutdown message unless remote closed or
385 # connection not yet established
386 #
387 unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
388 {
389 my $msg=$::CONF{'SubscriberShutdownMsg'};
390 if(defined($msg) && $msg ne '')
391 {
392 $self->write($msg);
393 }
394 }
395
396 $::Statistics->{'current_subscribers'}--;
397
398 $self->SUPER::close();
399 }
400
401 sub checkForMaxTime {
402 my $self=shift;
403 my $time=shift;
404
405 $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
406 }
407
408 1;
409 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26