/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 49 - (show annotations)
Mon Feb 4 22:30:35 2008 UTC (16 years, 2 months ago) by knops.gerd
File size: 8839 byte(s)
• Removed support for numbered header templates and hard coded persist values, use new mode support instead

1 #!/usr/bin/perl -w
2 ###############################################################################
3 # Meteor
4 # An HTTP server for the 2.0 web
5 # Copyright (c) 2006 contributing authors
6 #
7 # Subscriber.pm
8 #
9 # Description:
10 # A Meteor Subscriber
11 #
12 ###############################################################################
13 #
14 # This program is free software; you can redistribute it and/or modify it
15 # under the terms of the GNU General Public License as published by the Free
16 # Software Foundation; either version 2 of the License, or (at your option)
17 # any later version.
18 #
19 # This program is distributed in the hope that it will be useful, but WITHOUT
20 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22 # more details.
23 #
24 # You should have received a copy of the GNU General Public License along
25 # with this program; if not, write to the Free Software Foundation, Inc.,
26 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #
28 # For more information visit www.meteorserver.org
29 #
30 ###############################################################################
31
32 package Meteor::Subscriber;
33 ###############################################################################
34 # Configuration
35 ###############################################################################
36
37 use strict;
38
39 use Meteor::Connection;
40 use Meteor::Channel;
41 use Meteor::Document;
42
43 @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44
45 our %PersistentConnections=();
46 our $NumAcceptedConnections=0;
47
48
49 ###############################################################################
50 # Factory methods
51 ###############################################################################
52 sub newFromServer {
53 my $class=shift;
54
55 my $self=$class->SUPER::newFromServer(shift);
56
57 $self->{'headerBuffer'}='';
58 $self->{'MessageCount'}=0;
59 $self->{'MaxMessageCount'}=0;
60
61 $self->{'ConnectionStart'}=time;
62 my $maxTime=$::CONF{'MaxTime'};
63 if($maxTime>0)
64 {
65 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
66 }
67
68 $::Statistics->{'current_subscribers'}++;
69 $::Statistics->{'subscriber_connections_accepted'}++;
70
71 $self;
72 }
73
74 ###############################################################################
75 # Class methods
76 ###############################################################################
77 sub deleteSubscriberWithID {
78 my $class=shift;
79 my $id=shift;
80
81 if(exists($PersistentConnections{$id}))
82 {
83 $PersistentConnections{$id}->close();
84 }
85 }
86
87 sub pingPersistentConnections {
88 my $class=shift;
89
90 my @cons=values %PersistentConnections;
91
92 map { $_->ping() } @cons;
93 }
94
95 sub checkPersistentConnectionsForMaxTime {
96 my $class=shift;
97
98 my $time=time;
99 my @cons=values %PersistentConnections;
100
101 map { $_->checkForMaxTime($time) } @cons;
102 }
103
104 sub numSubscribers {
105
106 return scalar(keys %PersistentConnections);
107 }
108
109 ###############################################################################
110 # Instance methods
111 ###############################################################################
112 sub processLine {
113 my $self=shift;
114 my $line=shift;
115
116 # Once the header was processed we ignore any input
117 return unless(exists($self->{'headerBuffer'}));
118
119 if($line ne '')
120 {
121 #
122 # Accumulate header
123 #
124 $self->{'headerBuffer'}.="$line\n";
125 }
126 else
127 {
128 #
129 # Empty line signals end of header.
130 # Analyze header, register with appropiate channel
131 # and send pending messages.
132 #
133 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
134 #
135 # Find the 'GET' line
136 #
137 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
138 {
139 my $subscriberID=$1;
140 $self->{'mode'}=$2;
141 my $persist=$self->getConf('Persist');
142
143 my $maxTime=$self->getConf('MaxTime');
144 if($maxTime>0)
145 {
146 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
147 }
148
149 my @channelData=split('/',$3);
150 my $channels={};
151 my $channelName;
152 my $offset;
153 foreach my $chandef (@channelData) {
154 if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {
155 $channelName = $1;
156 $channels->{$channelName}->{'startIndex'} = undef;
157 if ($3) {
158 $offset = $4;
159 if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
160 if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
161 if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
162 }
163 }
164 }
165
166 delete($self->{'headerBuffer'});
167
168 if($persist)
169 {
170 $self->{'subscriberID'}=$subscriberID;
171 $self->deleteSubscriberWithID($subscriberID);
172 $PersistentConnections{$subscriberID}=$self;
173 }
174
175 if(scalar(keys %{$channels}))
176 {
177 $self->emitOKHeader();
178 $self->setChannels($channels,$persist,$self->{'mode'},'');
179 $self->close(1) unless($persist);
180 return;
181 }
182 }
183 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
184 {
185 $self->deleteSubscriberWithID($1);
186 $self->emitOKHeader();
187 $self->close(1);
188 return;
189 }
190 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
191 {
192 Meteor::Document->serveFileToClient($1,$self);
193 $self->close(1);
194 return;
195 }
196
197 #
198 # If we fall through we did not understand the request
199 #
200 $self->emitErrorHeader();
201 }
202 }
203
204 sub setChannels {
205 my $self=shift;
206 my $channels=shift;
207 my $persist=shift;
208 my $mode=shift || '';
209 my $userAgent=shift || '';
210
211 foreach my $channelName (keys %{$channels})
212 {
213 my $startIndex=$channels->{$channelName}->{'startIndex'};
214
215 my $channel=Meteor::Channel->channelWithName($channelName);
216
217 $self->{'channels'}->{$channelName}=$channel if($persist);
218
219 $channel->addSubscriber($self,$startIndex,$persist,$mode,$userAgent);
220 }
221 }
222
223 sub emitOKHeader {
224 my $self=shift;
225
226 $self->emitHeader('200 OK');
227 }
228
229 sub emitErrorHeader {
230 my $self=shift;
231
232 $self->emitHeader('404 Not Found');
233 $::Statistics->{'errors_served'}++;
234
235 # close up shop here!
236 $self->close();
237 }
238
239 sub emitHeader {
240 my $self=shift;
241 my $status=shift;
242
243 my $header=$self->getConf('HeaderTemplate');
244
245 $header=~s/~([^~]*)~/
246 if(!defined($1) || $1 eq '')
247 {
248 '~';
249 }
250 elsif($1 eq 'server')
251 {
252 $::PGM;
253 }
254 elsif($1 eq 'status')
255 {
256 $status;
257 }
258 elsif($1 eq 'servertime')
259 {
260 time;
261 }
262 elsif($1 eq 'channelinfo')
263 {
264 Meteor::Channel->listChannelsUsingTemplate($self->getConf('ChannelInfoTemplate'));
265 }
266 else
267 {
268 '';
269 }
270 /gex;
271
272 $self->write($header);
273 }
274
275 sub sendMessages {
276 my $self=shift;
277
278 my $numMessages=0;
279 my $msgTemplate=$self->getConf('Messagetemplate');
280 my $msgData='';
281
282 foreach my $message (@_)
283 {
284 $msgData.=$message->messageWithTemplate($msgTemplate);
285 $numMessages++;
286 }
287
288 return if($numMessages<1);
289
290 $self->write($msgData);
291
292 $::Statistics->{'messages_served'}+=$numMessages;
293
294 my $msgCount=$self->{'MessageCount'};
295 $msgCount+=$numMessages;
296 $self->{'MessageCount'}=$msgCount;
297
298 my $maxMsg=$self->getConf('MaxMessages');
299 if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
300 {
301 $self->close(1);
302 }
303
304 if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
305 {
306 $self->close(1);
307 }
308
309 }
310
311 sub ping {
312 my $self=shift;
313 my $msg=$self->getConf('PingMessage');
314
315 $self->write($msg);
316 }
317
318 sub closeChannel {
319 my $self=shift;
320 my $channelName=shift;
321
322 return unless(exists($self->{'channels'}->{$channelName}));
323
324 my $channel=$self->{'channels'}->{$channelName};
325 $channel->removeSubscriber($self,'channelClose');
326
327 delete($self->{'channels'}->{$channelName});
328
329 $self->close(0,'channelsClosed') if(scalar(keys %{$self->{'channels'}})==0);
330 }
331
332 sub close {
333 my $self=shift;
334 my $noShutdownMsg=shift;
335
336 foreach my $channelName (keys %{$self->{'channels'}})
337 {
338 my $channel=$self->{'channels'}->{$channelName};
339 $channel->removeSubscriber($self,'subscriberClose');
340 }
341 delete($self->{'channels'});
342
343 if(exists($self->{'subscriberID'}))
344 {
345 delete($PersistentConnections{$self->{'subscriberID'}});
346 }
347
348 #
349 # Send shutdown message unless remote closed or
350 # connection not yet established
351 #
352 unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
353 {
354 my $msg=$self->getConf('SubscriberShutdownMsg');
355 if(defined($msg) && $msg ne '')
356 {
357 $self->write($msg);
358 }
359 }
360
361 $self->SUPER::close();
362 }
363
364 sub didClose {
365
366 $::Statistics->{'current_subscribers'}--;
367 }
368
369 sub checkForMaxTime {
370 my $self=shift;
371 my $time=shift;
372
373 $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
374 }
375
376 sub getConf {
377 my $self=shift;
378 my $key=shift;
379
380 if(exists($self->{'mode'}) && $self->{'mode'} ne '')
381 {
382 my $k=$key.'.'.$self->{'mode'};
383
384 return $::CONF{$k} if(exists($::CONF{$k}));
385 }
386
387 $::CONF{$key};
388 }
389
390 1;
391 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26