/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 47 - (show annotations)
Mon Feb 4 21:06:42 2008 UTC (16 years, 1 month ago) by knops.gerd
File size: 9402 byte(s)
• syslog change: If `SyslogFacility` is set to `none`, meteord will not put itself into the background and print all syslog messages with a priority higher than `debug` to standard output. The output will be prefixed with a timestamp (unix time in seconds) and a tab character.

• New syslog messges: joinchannel, leavechannel, document


1 #!/usr/bin/perl -w
2 ###############################################################################
3 # Meteor
4 # An HTTP server for the 2.0 web
5 # Copyright (c) 2006 contributing authors
6 #
7 # Subscriber.pm
8 #
9 # Description:
10 # A Meteor Subscriber
11 #
12 ###############################################################################
13 #
14 # This program is free software; you can redistribute it and/or modify it
15 # under the terms of the GNU General Public License as published by the Free
16 # Software Foundation; either version 2 of the License, or (at your option)
17 # any later version.
18 #
19 # This program is distributed in the hope that it will be useful, but WITHOUT
20 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22 # more details.
23 #
24 # You should have received a copy of the GNU General Public License along
25 # with this program; if not, write to the Free Software Foundation, Inc.,
26 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #
28 # For more information visit www.meteorserver.org
29 #
30 ###############################################################################
31
32 package Meteor::Subscriber;
33 ###############################################################################
34 # Configuration
35 ###############################################################################
36
37 use strict;
38
39 use Meteor::Connection;
40 use Meteor::Channel;
41 use Meteor::Document;
42
43 @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44
45 our %PersistentConnections=();
46 our $NumAcceptedConnections=0;
47
48
49 ###############################################################################
50 # Factory methods
51 ###############################################################################
52 sub newFromServer {
53 my $class=shift;
54
55 my $self=$class->SUPER::newFromServer(shift);
56
57 $self->{'headerBuffer'}='';
58 $self->{'MessageCount'}=0;
59 $self->{'MaxMessageCount'}=0;
60
61 $self->{'ConnectionStart'}=time;
62 my $maxTime=$::CONF{'MaxTime'};
63 if($maxTime>0)
64 {
65 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
66 }
67
68 $::Statistics->{'current_subscribers'}++;
69 $::Statistics->{'subscriber_connections_accepted'}++;
70
71 $self;
72 }
73
74 ###############################################################################
75 # Class methods
76 ###############################################################################
77 sub deleteSubscriberWithID {
78 my $class=shift;
79 my $id=shift;
80
81 if(exists($PersistentConnections{$id}))
82 {
83 $PersistentConnections{$id}->close();
84 }
85 }
86
87 sub pingPersistentConnections {
88 my $class=shift;
89
90 my @cons=values %PersistentConnections;
91
92 map { $_->ping() } @cons;
93 }
94
95 sub checkPersistentConnectionsForMaxTime {
96 my $class=shift;
97
98 my $time=time;
99 my @cons=values %PersistentConnections;
100
101 map { $_->checkForMaxTime($time) } @cons;
102 }
103
104 sub numSubscribers {
105
106 return scalar(keys %PersistentConnections);
107 }
108
109 ###############################################################################
110 # Instance methods
111 ###############################################################################
112 sub processLine {
113 my $self=shift;
114 my $line=shift;
115
116 # Once the header was processed we ignore any input
117 return unless(exists($self->{'headerBuffer'}));
118
119 if($line ne '')
120 {
121 #
122 # Accumulate header
123 #
124 $self->{'headerBuffer'}.="$line\n";
125 }
126 else
127 {
128 #
129 # Empty line signals end of header.
130 # Analyze header, register with appropiate channel
131 # and send pending messages.
132 #
133 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
134 #
135 # Find the 'GET' line
136 #
137 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
138 {
139 my $subscriberID=$1;
140 $self->{'mode'}=$2;
141 my $persist=$self->getConf('Persist');
142
143 if ($self->{'mode'} eq "xhrinteractive" || $self->{'mode'} eq "iframe" || $self->{'mode'} eq "serversent" || $self->{'mode'} eq "longpoll") {
144 $persist=1;
145 $self->{'MaxMessageCount'}=1 unless(!($self->{'mode'} eq "longpoll"));
146 }
147 if ($self->{'mode'} eq "iframe") {
148 $self->{'HeaderTemplateNumber'}=1;
149 } else {
150 $self->{'HeaderTemplateNumber'}=2;
151 }
152
153 my $maxTime=$self->getConf('MaxTime');
154 if($maxTime>0)
155 {
156 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
157 }
158
159 my @channelData=split('/',$3);
160 my $channels={};
161 my $channelName;
162 my $offset;
163 foreach my $chandef (@channelData) {
164 if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {
165 $channelName = $1;
166 $channels->{$channelName}->{'startIndex'} = undef;
167 if ($3) {
168 $offset = $4;
169 if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
170 if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
171 if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
172 }
173 }
174 }
175
176 delete($self->{'headerBuffer'});
177
178 if($persist)
179 {
180 $self->{'subscriberID'}=$subscriberID;
181 $self->deleteSubscriberWithID($subscriberID);
182 $PersistentConnections{$subscriberID}=$self;
183 }
184
185 if(scalar(keys %{$channels}))
186 {
187 $self->emitOKHeader();
188 $self->setChannels($channels,$persist,$self->{'mode'},'');
189 $self->close(1) unless($persist);
190 return;
191 }
192 }
193 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
194 {
195 $self->deleteSubscriberWithID($1);
196 $self->emitOKHeader();
197 $self->close(1);
198 return;
199 }
200 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
201 {
202 Meteor::Document->serveFileToClient($1,$self);
203 $self->close(1);
204 return;
205 }
206
207 #
208 # If we fall through we did not understand the request
209 #
210 $self->emitErrorHeader();
211 }
212 }
213
214 sub setChannels {
215 my $self=shift;
216 my $channels=shift;
217 my $persist=shift;
218 my $mode=shift || '';
219 my $userAgent=shift || '';
220
221 foreach my $channelName (keys %{$channels})
222 {
223 my $startIndex=$channels->{$channelName}->{'startIndex'};
224
225 my $channel=Meteor::Channel->channelWithName($channelName);
226
227 $self->{'channels'}->{$channelName}=$channel if($persist);
228
229 $channel->addSubscriber($self,$startIndex,$persist,$mode,$userAgent);
230 }
231 }
232
233 sub emitOKHeader {
234 my $self=shift;
235
236 $self->emitHeader('200 OK');
237 }
238
239 sub emitErrorHeader {
240 my $self=shift;
241
242 $self->emitHeader('404 Not Found');
243 $::Statistics->{'errors_served'}++;
244
245 # close up shop here!
246 $self->close();
247 }
248
249 sub emitHeader {
250 my $self=shift;
251 my $status=shift;
252
253 my $header=undef;
254 if(exists($self->{'HeaderTemplateNumber'}))
255 {
256 my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};
257
258 $header=$self->getConf($hn);
259 }
260 $header=$self->getConf('HeaderTemplate') unless(defined($header));
261
262 $header=~s/~([^~]*)~/
263 if(!defined($1) || $1 eq '')
264 {
265 '~';
266 }
267 elsif($1 eq 'server')
268 {
269 $::PGM;
270 }
271 elsif($1 eq 'status')
272 {
273 $status;
274 }
275 elsif($1 eq 'servertime')
276 {
277 time;
278 }
279 elsif($1 eq 'channelinfo')
280 {
281 Meteor::Channel->listChannelsUsingTemplate($self->getConf('ChannelInfoTemplate'));
282 }
283 else
284 {
285 '';
286 }
287 /gex;
288
289 $self->write($header);
290 }
291
292 sub sendMessages {
293 my $self=shift;
294
295 my $numMessages=0;
296 my $msgTemplate=$self->getConf('Messagetemplate');
297 my $msgData='';
298
299 foreach my $message (@_)
300 {
301 $msgData.=$message->messageWithTemplate($msgTemplate);
302 $numMessages++;
303 }
304
305 return if($numMessages<1);
306
307 $self->write($msgData);
308
309 $::Statistics->{'messages_served'}+=$numMessages;
310
311 my $msgCount=$self->{'MessageCount'};
312 $msgCount+=$numMessages;
313 $self->{'MessageCount'}=$msgCount;
314
315 my $maxMsg=$self->getConf('MaxMessages');
316 if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
317 {
318 $self->close(1);
319 }
320
321 if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
322 {
323 $self->close(1);
324 }
325
326 }
327
328 sub ping {
329 my $self=shift;
330 my $msg=$self->getConf('PingMessage');
331
332 $self->write($msg);
333 }
334
335 sub closeChannel {
336 my $self=shift;
337 my $channelName=shift;
338
339 return unless(exists($self->{'channels'}->{$channelName}));
340
341 my $channel=$self->{'channels'}->{$channelName};
342 $channel->removeSubscriber($self,'channelClose');
343
344 delete($self->{'channels'}->{$channelName});
345
346 $self->close(0,'channelsClosed') if(scalar(keys %{$self->{'channels'}})==0);
347 }
348
349 sub close {
350 my $self=shift;
351 my $noShutdownMsg=shift;
352
353 foreach my $channelName (keys %{$self->{'channels'}})
354 {
355 my $channel=$self->{'channels'}->{$channelName};
356 $channel->removeSubscriber($self,'subscriberClose');
357 }
358 delete($self->{'channels'});
359
360 if(exists($self->{'subscriberID'}))
361 {
362 delete($PersistentConnections{$self->{'subscriberID'}});
363 }
364
365 #
366 # Send shutdown message unless remote closed or
367 # connection not yet established
368 #
369 unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
370 {
371 my $msg=$self->getConf('SubscriberShutdownMsg');
372 if(defined($msg) && $msg ne '')
373 {
374 $self->write($msg);
375 }
376 }
377
378 $self->SUPER::close();
379 }
380
381 sub didClose {
382
383 $::Statistics->{'current_subscribers'}--;
384 }
385
386 sub checkForMaxTime {
387 my $self=shift;
388 my $time=shift;
389
390 $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
391 }
392
393 sub getConf {
394 my $self=shift;
395 my $key=shift;
396
397 if(exists($self->{'mode'}) && $self->{'mode'} ne '')
398 {
399 my $k=$key.'.'.$self->{'mode'};
400
401 return $::CONF{$k} if(exists($::CONF{$k}));
402 }
403
404 $::CONF{$key};
405 }
406
407 1;
408 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26