/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 13 - (show annotations)
Mon Apr 30 18:16:17 2007 UTC (16 years, 11 months ago) by knops.gerd
File size: 9147 byte(s)
• Allow to subscribe to multiple channels

1 #!/usr/bin/perl -w
2 ###############################################################################
3 # Meteor
4 # An HTTP server for the 2.0 web
5 # Copyright (c) 2006 contributing authors
6 #
7 # Subscriber.pm
8 #
9 # Description:
10 # A Meteor Subscriber
11 #
12 ###############################################################################
13 #
14 # This program is free software; you can redistribute it and/or modify it
15 # under the terms of the GNU General Public License as published by the Free
16 # Software Foundation; either version 2 of the License, or (at your option)
17 # any later version.
18 #
19 # This program is distributed in the hope that it will be useful, but WITHOUT
20 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22 # more details.
23 #
24 # You should have received a copy of the GNU General Public License along
25 # with this program; if not, write to the Free Software Foundation, Inc.,
26 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #
28 # For more information visit www.meteorserver.org
29 #
30 ###############################################################################
31
32 package Meteor::Subscriber;
33 ###############################################################################
34 # Configuration
35 ###############################################################################
36
37 use strict;
38
39 use Meteor::Connection;
40 use Meteor::Channel;
41 use Meteor::Document;
42
43 @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44
45 our %PersistentConnections=();
46
47 ###############################################################################
48 # Factory methods
49 ###############################################################################
50 sub newFromServer {
51 my $class=shift;
52
53 my $self=$class->SUPER::newFromServer(shift);
54
55 $self->{'headerBuffer'}='';
56 $self->{'MessageCount'}=0;
57 $self->{'MaxMessageCount'}=0;
58
59 $self->{'ConnectionStart'}=time;
60 my $maxTime=$::CONF{'MaxTime'};
61 if($maxTime>0)
62 {
63 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
64 }
65
66 $self;
67 }
68
69 ###############################################################################
70 # Class methods
71 ###############################################################################
72 sub deleteSubscriberWithID {
73 my $class=shift;
74 my $id=shift;
75
76 if(exists($PersistentConnections{$id}))
77 {
78 $PersistentConnections{$id}->close(1);
79 }
80 }
81
82 sub pingPersistentConnections {
83 my $class=shift;
84
85 my $msg=$::CONF{'PingMessage'};
86 my @cons=values %PersistentConnections;
87
88 map { $_->write($msg) } @cons;
89 }
90
91 sub checkPersistentConnectionsForMaxTime {
92 my $class=shift;
93
94 my $time=time;
95 my @cons=values %PersistentConnections;
96
97 map { $_->checkForMaxTime($time) } @cons;
98 }
99
100 ###############################################################################
101 # Instance methods
102 ###############################################################################
103 sub processLine {
104 my $self=shift;
105 my $line=shift;
106
107 # Once the header was processed we ignore any input
108 return unless(exists($self->{'headerBuffer'}));
109
110 if($line ne '')
111 {
112 #
113 # Accumulate header
114 #
115 $self->{'headerBuffer'}.="$line\n";
116 }
117 else
118 {
119 #
120 # Empty line signals end of header.
121 # Analyze header, register with appropiate channel
122 # and send pending messages.
123 #
124 # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1
125 #
126 # Find the 'GET' line
127 #
128 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/)
129 {
130 my @formData=split('&',$1);
131 my $channelName=undef;
132 my $startIndex=undef;
133 my $backtrack=undef;
134 my $persist=1;
135 my $anyPersist=0;
136 my $subscriberID=undef;
137 my $channels={};
138 foreach my $formElement (@formData)
139 {
140 if($formElement=~/^channel=(.+)$/)
141 {
142 if(defined($channelName))
143 {
144 if(defined($startIndex) && defined($backtrack))
145 {
146 $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");
147 $self->close();
148
149 return;
150 }
151
152 $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));
153 $channels->{$channelName}->{'startIndex'}=$startIndex;
154 $channels->{$channelName}->{'persist'}=$persist;
155 $anyPersist|=$persist;
156
157 $startIndex=undef;
158 $backtrack=undef;
159 $persist=1;
160 }
161 $channelName=$1;
162 }
163 elsif($formElement=~/^restartfrom=(\d*)$/)
164 {
165 $startIndex=$1;
166 $startIndex='' unless(defined($startIndex));
167 }
168 elsif($formElement=~/^backtrack=(\d+)$/)
169 {
170 $backtrack=$1;
171 $backtrack=0 unless(defined($backtrack));
172 }
173 elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/)
174 {
175 $persist=0 if($1=~/(no|false|0)/i);
176 }
177 elsif($formElement=~/^id=(.+)$/)
178 {
179 $subscriberID=$1;
180 }
181 elsif($formElement=~/^maxmessages=(\d+)$/i)
182 {
183 $self->{'MaxMessageCount'}=$1;
184 }
185 elsif($formElement=~/^template=(\d+)$/i)
186 {
187 $self->{'HeaderTemplateNumber'}=$1;
188 }
189 elsif($formElement=~/^maxtime=(\d+)$/i)
190 {
191 my $clientRequest=$1;
192 my $serverDefault=$::CONF{'MaxTime'};
193
194 if($serverDefault==0 || $serverDefault>$clientRequest)
195 {
196 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest;
197 }
198 }
199 }
200
201 if(defined($channelName))
202 {
203 if(defined($startIndex) && defined($backtrack))
204 {
205 $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");
206 $self->close();
207
208 return;
209 }
210
211 $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));
212 $channels->{$channelName}->{'startIndex'}=$startIndex;
213 $channels->{$channelName}->{'persist'}=$persist;
214 $anyPersist|=$persist;
215 }
216
217 delete($self->{'headerBuffer'});
218
219 if(defined($subscriberID) && $anyPersist)
220 {
221 $self->{'subscriberID'}=$subscriberID;
222 $self->deleteSubscriberWithID($subscriberID);
223 $PersistentConnections{$subscriberID}=$self;
224 }
225
226 if(scalar(keys %{$channels}))
227 {
228 $self->emitOKHeader();
229
230 $self->setChannels($channels);
231
232 $self->close(1) unless($anyPersist);
233
234 return;
235 }
236 }
237 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
238 {
239 Meteor::Document->serveFileToClient($1,$self);
240
241 $self->close(1);
242
243 return;
244 }
245
246 #
247 # If we fall through we did not understand the request
248 #
249 $self->emitErrorHeader();
250 }
251 }
252
253 sub setChannels {
254 my $self=shift;
255 my $channels=shift;
256
257 foreach my $channelName (keys %{$channels})
258 {
259 my $persist=$channels->{$channelName}->{'persist'};
260 my $startIndex=$channels->{$channelName}->{'startIndex'};
261
262 my $channel=Meteor::Channel->channelWithName($channelName);
263
264 $self->{'channels'}->{$channelName}=$channel if($persist);
265
266 $channel->addSubscriber($self,$startIndex,$persist);
267 }
268 }
269
270 sub emitOKHeader {
271 my $self=shift;
272
273 $self->emitHeader('200 OK');
274 }
275
276 sub emitErrorHeader {
277 my $self=shift;
278
279 $self->emitHeader('404 Not Found');
280
281 # close up shop here!
282 $self->close();
283 }
284
285 sub emitHeader {
286 my $self=shift;
287 my $status=shift;
288
289 my $header=undef;
290 if(exists($self->{'HeaderTemplateNumber'}))
291 {
292 my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};
293
294 $header=$::CONF{$hn};
295 }
296 $header=$::CONF{'HeaderTemplate'} unless(defined($header));
297
298 $header=~s/~([^~]+)~/
299 if(!defined($1) || $1 eq '')
300 {
301 '~';
302 }
303 elsif($1 eq 'server')
304 {
305 $::PGM;
306 }
307 elsif($1 eq 'status')
308 {
309 $status;
310 }
311 elsif($1 eq 'servertime')
312 {
313 time;
314 }
315 else
316 {
317 '';
318 }
319 /gex;
320
321 $self->write($header);
322 }
323
324 sub sendMessage {
325 my $self=shift;
326 my $msg=shift;
327
328 $self->write($msg);
329
330 my $msgCount=++$self->{'MessageCount'};
331
332 my $maxMsg=$::CONF{'MaxMessages'};
333 if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
334 {
335 $self->close(1);
336 }
337
338 if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
339 {
340 $self->close(1);
341 }
342 }
343
344 sub closeChannel {
345 my $self=shift;
346 my $channelName=shift;
347
348 return unless(exists($self->{'channels'}->{$channelName}));
349
350 my $channel=$self->{'channels'}->{$channelName};
351 $channel->removeSubscriber($self);
352
353 delete($self->{'channels'}->{$channelName});
354
355 $self->close() if(scalar(keys %{$self->{'channels'}})==0);
356 }
357
358 sub close {
359 my $self=shift;
360 my $noShutdownMsg=shift;
361
362 foreach my $channelName (keys %{$self->{'channels'}})
363 {
364 my $channel=$self->{'channels'}->{$channelName};
365 $channel->removeSubscriber($self);
366 }
367 delete($self->{'channels'});
368
369 if(exists($self->{'subscriberID'}))
370 {
371 delete($PersistentConnections{$self->{'subscriberID'}});
372 }
373
374 #
375 # Send shutdown message unless remote closed or
376 # connection not yet established
377 #
378 unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
379 {
380 my $msg=$::CONF{'SubscriberShutdownMsg'};
381 if(defined($msg) && $msg ne '')
382 {
383 $self->write($msg);
384 }
385 }
386
387 $self->SUPER::close();
388 }
389
390 sub checkForMaxTime {
391 my $self=shift;
392 my $time=shift;
393
394 $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
395 }
396
397 1;
398 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26