/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 53 - (show annotations)
Wed Feb 27 21:58:56 2008 UTC (16 years, 1 month ago) by andrew.betts
File size: 9318 byte(s)
Updated version number

JS client:
Added channel info handler to JS client, assumes Meteor will send <script>ch('channel', msgid);</script>
Allowed processing of messages prior to current message index
Added disconnect() to eof()
Revert to poll mode if unable to load frame (should fix IE proxy issues)

Server:
Fixed output of channel info to show only subscribed channels (and simplified)
Added logging of IP addresses

1 #!/usr/bin/perl -w
2 ###############################################################################
3 # Meteor
4 # An HTTP server for the 2.0 web
5 # Copyright (c) 2006 contributing authors
6 #
7 # Subscriber.pm
8 #
9 # Description:
10 # A Meteor Subscriber
11 #
12 ###############################################################################
13 #
14 # This program is free software; you can redistribute it and/or modify it
15 # under the terms of the GNU General Public License as published by the Free
16 # Software Foundation; either version 2 of the License, or (at your option)
17 # any later version.
18 #
19 # This program is distributed in the hope that it will be useful, but WITHOUT
20 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22 # more details.
23 #
24 # You should have received a copy of the GNU General Public License along
25 # with this program; if not, write to the Free Software Foundation, Inc.,
26 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #
28 # For more information visit www.meteorserver.org
29 #
30 ###############################################################################
31
32 package Meteor::Subscriber;
33 ###############################################################################
34 # Configuration
35 ###############################################################################
36
37 use strict;
38
39 use Meteor::Connection;
40 use Meteor::Channel;
41 use Meteor::Document;
42
43 @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44
45 our %PersistentConnections=();
46 our $NumAcceptedConnections=0;
47
48
49 ###############################################################################
50 # Factory methods
51 ###############################################################################
52 sub newFromServer {
53 my $class=shift;
54
55 my $self=$class->SUPER::newFromServer(shift);
56
57 $self->{'headerBuffer'}='';
58 $self->{'MessageCount'}=0;
59 $self->{'MaxMessageCount'}=0;
60
61 $self->{'ConnectionStart'}=time;
62 my $maxTime=$::CONF{'MaxTime'};
63 if($maxTime>0)
64 {
65 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
66 }
67
68 $::Statistics->{'current_subscribers'}++;
69 $::Statistics->{'subscriber_connections_accepted'}++;
70
71 $self;
72 }
73
74 ###############################################################################
75 # Class methods
76 ###############################################################################
77 sub deleteSubscriberWithID {
78 my $class=shift;
79 my $id=shift;
80
81 if(exists($PersistentConnections{$id}))
82 {
83 $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
84 }
85 }
86
87 sub pingPersistentConnections {
88 my $class=shift;
89
90 my @cons=values %PersistentConnections;
91
92 map { $_->ping() } @cons;
93 }
94
95 sub checkPersistentConnectionsForMaxTime {
96 my $class=shift;
97
98 my $time=time;
99 my @cons=values %PersistentConnections;
100
101 map { $_->checkForMaxTime($time) } @cons;
102 }
103
104 sub numSubscribers {
105
106 return scalar(keys %PersistentConnections);
107 }
108
109 ###############################################################################
110 # Instance methods
111 ###############################################################################
112 sub processLine {
113 my $self=shift;
114 my $line=shift;
115
116 # Once the header was processed we ignore any input
117 return unless(exists($self->{'headerBuffer'}));
118
119 if($line ne '')
120 {
121 #
122 # Accumulate header
123 #
124 $self->{'headerBuffer'}.="$line\n";
125 }
126 else
127 {
128 #
129 # Empty line signals end of header.
130 # Analyze header, register with appropiate channel
131 # and send pending messages.
132 #
133 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
134 #
135 # Find the 'GET' line
136 #
137 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
138 {
139 $self->{'subscriberID'}=$1;
140 $self->{'mode'}=$2;
141 my $persist=$self->getConf('Persist');
142 my $maxTime=$self->getConf('MaxTime');
143 $self->{'ConnectionTimeLimit'} = ($self->{'ConnectionStart'}+$maxTime) if ($maxTime>0);
144
145 my @channelData=split('/',$3);
146 my $channels={};
147 my $channelName;
148 my $offset;
149 foreach my $chandef (@channelData) {
150 if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
151 $channelName = $1;
152 $channels->{$channelName}->{'startIndex'} = undef;
153 if ($3) {
154 $offset = $4;
155 if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
156 if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
157 if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
158 }
159 }
160 }
161 my $useragent = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
162
163 delete($self->{'headerBuffer'});
164
165 if ($persist) {
166 $self->deleteSubscriberWithID($self->{'subscriberID'});
167 $PersistentConnections{$self->{'subscriberID'}}=$self;
168 }
169
170 if(scalar(keys %{$channels})) {
171
172 $self->{'channelinfo'} = '';
173 my $citemplate = $self->getConf('ChannelInfoTemplate');
174 foreach $channelName (keys %{$channels}) {
175 my $channel=Meteor::Channel->channelWithName($channelName);
176 $self->{'channels'}->{$channelName}=$channel;
177 $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
178
179 }
180 $self->emitOKHeader();
181 foreach $channelName (keys %{$channels}) {
182 my $startIndex=$channels->{$channelName}->{'startIndex'};
183 $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$useragent);
184 }
185 delete ($self->{'channels'}) unless($persist);
186 $self->close(1, 'responseComplete') unless($persist);
187 return;
188 }
189 }
190 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
191 {
192 $self->deleteSubscriberWithID($1);
193 $self->emitOKHeader();
194 $self->close(1, 'disconnectRequested');
195 return;
196 }
197 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
198 {
199 Meteor::Document->serveFileToClient($1,$self);
200 $self->close(1, 'responseComplete');
201 return;
202 }
203
204 #
205 # If we fall through we did not understand the request
206 #
207 $self->emitErrorHeader();
208 }
209 }
210
211 sub emitOKHeader {
212 my $self=shift;
213
214 $self->emitHeader('200 OK');
215 }
216
217 sub emitErrorHeader {
218 my $self=shift;
219
220 $self->emitHeader('404 Not Found');
221 $::Statistics->{'errors_served'}++;
222
223 # close up shop here!
224 $self->close(0, 'error');
225 }
226
227 sub emitHeader {
228 my $self=shift;
229 my $status=shift;
230
231 my $header=$self->getConf('HeaderTemplate');
232
233 $header=~s/~([^~]*)~/
234 if(!defined($1) || $1 eq '') {
235 '~';
236 } elsif($1 eq 'server') {
237 $::PGM;
238 } elsif($1 eq 'status') {
239 $status;
240 } elsif($1 eq 'servertime') {
241 time;
242 } elsif($1 eq 'channelinfo') {
243 $self->{'channelinfo'};
244 } else {
245 '';
246 }
247 /gex;
248
249 $self->write($header);
250 }
251
252 sub sendMessages {
253 my $self=shift;
254
255 my $numMessages=0;
256 my $msgTemplate=$self->getConf('MessageTemplate');
257 my $msgData='';
258
259 foreach my $message (@_)
260 {
261 $msgData.=$message->messageWithTemplate($msgTemplate);
262 $numMessages++;
263 }
264
265 return if($numMessages<1);
266
267 $self->write($msgData);
268
269 $::Statistics->{'messages_served'}+=$numMessages;
270
271 my $msgCount=$self->{'MessageCount'};
272 $msgCount+=$numMessages;
273 $self->{'MessageCount'}=$msgCount;
274
275 my $maxMsg=$self->getConf('MaxMessages');
276 if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
277 {
278 $self->close(1, 'maxMessageCountReached');
279 }
280
281 if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
282 {
283 $self->close(1, 'maxMessageCountReached');
284 }
285
286 }
287
288 sub ping {
289 my $self=shift;
290 my $msg=$self->getConf('PingMessage');
291
292 $self->write($msg);
293 }
294
295 sub closeChannel {
296 my $self=shift;
297 my $channelName=shift;
298
299 return unless(exists($self->{'channels'}->{$channelName}));
300
301 my $channel=$self->{'channels'}->{$channelName};
302 $channel->removeSubscriber($self,'channelClose');
303
304 delete($self->{'channels'}->{$channelName});
305
306 $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
307 }
308
309 sub close {
310 my $self=shift;
311 my $noShutdownMsg=shift;
312 my $reason=shift;
313
314 foreach my $channelName (keys %{$self->{'channels'}})
315 {
316 my $channel=$self->{'channels'}->{$channelName};
317 $channel->removeSubscriber($self,$reason);
318 }
319 delete($self->{'channels'});
320
321 # If this connection is in the PersistentConnections array, delete it, then anonymise
322 # it so that if we have to wait for the write buffer to empty before close, it's only
323 # removed once.
324 if(exists($self->{'subscriberID'})) {
325 delete($PersistentConnections{$self->{'subscriberID'}});
326 delete($self->{'subscriberID'});
327 }
328
329 # Send shutdown message unless remote closed or
330 # connection not yet established
331 unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
332 {
333 my $msg=$self->getConf('SubscriberShutdownMsg');
334 if(defined($msg) && $msg ne '')
335 {
336 $self->write($msg);
337 }
338 }
339
340 $self->SUPER::close();
341 }
342
343 sub didClose {
344
345 $::Statistics->{'current_subscribers'}--;
346 }
347
348 sub checkForMaxTime {
349 my $self=shift;
350 my $time=shift;
351
352 $self->close(1,'maxTime') if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
353 }
354
355 sub getConf {
356 my $self=shift;
357 my $key=shift;
358
359 if(exists($self->{'mode'}) && $self->{'mode'} ne '')
360 {
361 my $k=$key.$self->{'mode'};
362
363 if(exists($::CONF{$k})) {
364 return $::CONF{$k};
365 }
366 }
367
368 $::CONF{$key};
369 }
370
371 1;
372 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26