/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 62 - (show annotations)
Thu Nov 27 00:33:21 2008 UTC (15 years, 4 months ago) by andrew.betts
File size: 10330 byte(s)
1 Fixed: Added SIGPIPE handler.  We noticed that under heavy load
Meteor receives SIGPIPEs from the OS, suspected to relate to clients
that have just disconnected the moment Meteor attempts to write to the
socket.  This caused Meteor to crash.
2 Fixed: Long polling multiple channels no longer causes the loop to
die and restart when some channels have messages queued for delivery.
3 Fixed: Over time, Meteor 'collected' connections from clients that
never got disconnected even if MaxTime was set.  This happened if the
client concerned sent a header with no terminating blank line.  Meteor
kept waiting for the rest of the header, which never arrived, and
therefore the client remained in limbo, never subjected to the MaxTime
time limit because it had not yet become a subscriber.  Clients are
now allowed 30 seconds to send a valid request header.
4 Fixed: If only one message existed on the server, the JS client
would continue to request it again and again, because it has message
ID 0, and the JS client considered this an invalid message ID.
5 Fixed: Corrected some comments in file headers

6 Changed: MaxMessages has been renamed to CloseOnEvent and functions
in a similar, but not quite identical way.  Thanks to Matthew Haak,
who pointed out the extreme confusingness of MaxMessages and a bug
that has resulted in Fix 2 above.  Setting CloseOnEvent to any value
that evaluates to true will cause Meteor to close subscriber
connections after at least one message has been sent and there are no
further messages pending.  This is identical to MaxMessages for values
of 0 and 1, but where MaxMessages is set to a value higher than one,
replacing it with CloseOnEvent with the same value will act as though
it were set to one.  The intent of MaxMessages was to enable long-
polling (and it is used by the JS client in that way), and
CloseonEvent is a drop in replacement for that behaviour.
7 Changed: Meteor JS client now uses dynamic <SCRIPT> tags for all
polling behaviours, rather than XHR.  This enables it to make poll
requests cross-domain (see 13)
8 Changed: Meteor JS client now abstracts timestamp lookups to a
dedicated method.
9 Changed: Default HeaderTemplates no longer include cache busting
headers, since all meteor requests contain a millisecond timestamp and
so no client makes the same request twice.  These were therefore
simply chewing up bandwidth.
10 Changed: Date strings used for logging debug messages are cached to
avoid numerous expensive lookups to localtime().
11 Changed: Channel info is only sent in a response if the client does
not request a restart from a specified ID.  The logic being that if
the client knows the ID they want to start from, they have already
made previous requests and have the channel information they need.
Bandwidth saving measure.

12 Added: JS client now has a Meteor.isSupportedBrowser() method,
which you can call to detemine whether Meteor will run in the user's
browser version.
13 Added: JS client can now use different hosts for polling and
streaming.  This is only really useful if your website is on a domain
that has a lot of cookies, and you don't want to send them in every
poll request.  Removing cookies from request headers can reduce the
size of the request significantly.  We find that with cookies included
Meteor poll requests are usually larger than the responses.  To use,
set Meteor.pollhost.  Meteor.pollhost can be any domain, while
Meteor.host must be a subdomain of your website hostname.
14 Added: Config file now supports new 'FooterTemplate' parameter, for
a string to send just before the connection to the subscriber is
closed.  This is in support of change 7.
15 Added: Better inline documentation for ChannelInfoTemplate config
parameter
16 Added: Log output includes connection IDs corresponding to the file
inode for each connection
17 Added: New controller command LISTCONNECTIONS, produces a newline
delimited list of all currently connected clients, and for each one
displaying "ConnectionID IPAddress ClientType [SubscriberID]"
18 Added: New controller command DESCRIBE, takes a ConnectionID as a
parameter, and outputs numerous statistics about that particular
client, including number of messages sent/received, user agent, IP
address, time connected, time remaining until MaxTime etc.
19 Added: New controller comment LISTSUBSCRIBERS, produces a newline
delimited list of all currently connected streaming subscribers, and
for each one displaying "SubscriberID IPAddress Starttime TimeLimit
TimeRemaining MessageCount UserAgent"
20 Added: SHOWSTATS command produces the following additional stats:
connection_count: total current connections, real_subscribers: total
of number of currently connected streaming subscribers plus the number
of unique polling connections seen in the last 60 seconds.
21 Added: STDERR outputs prior to every exit() for debugging purposes
22 Added: The UDP server is now considered stable, and is the best way
of broadcasting messages to lots of Meteor nodes simultaneously and
efficiently. 


1 #!/usr/bin/perl -w
2 ###############################################################################
3 # Meteor
4 # An HTTP server for the 2.0 web
5 # Copyright (c) 2006 contributing authors
6 #
7 # Subscriber.pm
8 #
9 # Description:
10 # A Meteor Subscriber
11 #
12 ###############################################################################
13 #
14 # This program is free software; you can redistribute it and/or modify it
15 # under the terms of the GNU General Public License as published by the Free
16 # Software Foundation; either version 2 of the License, or (at your option)
17 # any later version.
18 #
19 # This program is distributed in the hope that it will be useful, but WITHOUT
20 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22 # more details.
23 #
24 # You should have received a copy of the GNU General Public License along
25 # with this program; if not, write to the Free Software Foundation, Inc.,
26 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #
28 # For more information visit www.meteorserver.org
29 #
30 ###############################################################################
31
32 package Meteor::Subscriber;
33 ###############################################################################
34 # Configuration
35 ###############################################################################
36
37 use strict;
38
39 use Meteor::Connection;
40 use Meteor::Channel;
41 use Meteor::Document;
42
43 @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44
45 our %PersistentConnections=();
46 our $NumAcceptedConnections=0;
47
48
49 ###############################################################################
50 # Factory methods
51 ###############################################################################
52 sub newFromServer {
53 my $class=shift;
54
55 my $self=$class->SUPER::newFromServer(shift);
56
57 $self->{'headerBuffer'}='';
58 $self->{'messageCount'}=0;
59
60 my $maxTime=$::CONF{'MaxTime'};
61 if($maxTime>0)
62 {
63 $self->{'connectionTimeLimit'}=$self->{'connectionStart'}+$maxTime;
64 }
65
66 $::Statistics->{'current_subscribers'}++;
67 $::Statistics->{'subscriber_connections_accepted'}++;
68
69 $self;
70 }
71
72 ###############################################################################
73 # Class methods
74 ###############################################################################
75 sub deleteSubscriberWithID {
76 my $class=shift;
77 my $id=shift;
78
79 if(exists($PersistentConnections{$id}))
80 {
81 $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
82 }
83 }
84
85 sub subscriberExists {
86 my $class=shift;
87 my $id=shift;
88
89 return exists($PersistentConnections{$id});
90 }
91
92 sub pingPersistentConnections {
93 my $class=shift;
94
95 my @cons=values %PersistentConnections;
96
97 map { $_->ping() } @cons;
98 }
99
100 sub checkPersistentConnectionsForMaxTime {
101 my $class=shift;
102
103 my $time=time;
104 my @cons=values %PersistentConnections;
105
106 map { $_->checkForMaxTime($time) } @cons;
107 }
108
109 sub numSubscribers {
110
111 return scalar(keys %PersistentConnections);
112 }
113
114 sub listSubscribers {
115 my $class=shift;
116 my $list='';
117 foreach my $subscriber (keys %PersistentConnections)
118 {
119 my $sub = $PersistentConnections{$subscriber};
120 $list .= $subscriber.' '.$sub->{'ip'}.' '.$sub->{'connectionStart'}.' '.$sub->{'connectionTimeLimit'}.' '.($sub->{'connectionTimeLimit'}-time).' '.$sub->{'messageCount'}.' "'.$sub->{'useragent'}."\"$::CRLF";
121 }
122 $list;
123 }
124
125 ###############################################################################
126 # Instance methods
127 ###############################################################################
128 sub processLine {
129 my $self=shift;
130 my $line=shift;
131
132 # Once the header was processed we ignore any input - Meteor does not accept or process request bodies
133 return unless(exists($self->{'headerBuffer'}));
134
135 if($line ne '')
136 {
137 #
138 # Accumulate header
139 #
140 $self->{'headerBuffer'}.="$line\n";
141 }
142 else
143 {
144 #
145 # Empty line signals end of header.
146 # Analyze header, register with appropiate channel
147 # and send pending messages.
148 #
149 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
150 #
151 # Find the 'GET' line
152 #
153 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i)
154 {
155 $self->{'subscriberID'}=$1;
156 $self->{'mode'}=$2;
157 my $persist=$self->getConf('Persist');
158 my $maxTime=$self->getConf('MaxTime');
159 $self->{'connectionTimeLimit'} = ($self->{'connectionStart'}+$maxTime) if ($maxTime>0);
160
161 my @channelData=split('/',$3);
162 my $channels={};
163 my $channelName;
164 my $offset;
165 foreach my $chandef (@channelData) {
166 if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
167 $channelName = $1;
168 $channels->{$channelName}->{'startIndex'} = undef;
169 if ($3) {
170 $offset = $4;
171 if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
172 if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
173 if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
174 }
175 }
176 }
177 $self->{'useragent'} = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
178
179 if ($persist) {
180 # New persistent connection: kill any existing connection with same ID
181 $self->deleteSubscriberWithID($self->{'subscriberID'});
182 # Add new persistent connection to collection
183 $PersistentConnections{$self->{'subscriberID'}}=$self;
184 } else {
185 $::Pollers->{$self->{'subscriberID'}} = time;
186 }
187
188 if(scalar(keys %{$channels})) {
189
190 $self->{'channelinfo'} = '';
191 my $citemplate = $self->getConf('ChannelInfoTemplate');
192 foreach $channelName (keys %{$channels}) {
193 my $channel=Meteor::Channel->channelWithName($channelName);
194 $self->{'channels'}->{$channelName}=$channel;
195 if (defined($self->{'channels'}->{$channelName}->{'startIndex'}) && $self->{'channels'}->{$channelName}->{'startIndex'} > 0) {
196 $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
197 }
198 }
199 $self->emitOKHeader();
200 foreach $channelName (keys %{$channels}) {
201 my $startIndex=$channels->{$channelName}->{'startIndex'};
202 $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$self->{'useragent'});
203 }
204 delete ($self->{'channels'}) unless($persist);
205 $self->close(1, 'responseComplete') unless($persist);
206 $self->close(1, 'closedOnEvent') unless($self->{'messageCount'} == 0);
207 delete($self->{'headerBuffer'});
208 return;
209 }
210 }
211 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
212 {
213 $self->deleteSubscriberWithID($1);
214 $self->emitOKHeader();
215 $self->close(1, 'disconnectRequested');
216 return;
217 }
218 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
219 {
220 Meteor::Document->serveFileToClient($1,$self);
221 $self->close(1, 'responseComplete');
222 return;
223 }
224
225 #
226 # If we fall through we did not understand the request
227 #
228 $self->emitErrorHeader();
229 }
230 }
231
232 sub emitOKHeader {
233 my $self=shift;
234
235 $self->emitHeader('200 OK');
236 }
237
238 sub emitErrorHeader {
239 my $self=shift;
240
241 $self->emitHeader('404 Not Found');
242 $::Statistics->{'errors_served'}++;
243
244 # close up shop here!
245 $self->close(0, 'error');
246 }
247
248 sub emitHeader {
249 my $self=shift;
250 my $status=shift;
251
252 my $header=$self->getConf('HeaderTemplate');
253
254 $header=~s/~([^~]*)~/
255 if(!defined($1) || $1 eq '') {
256 '~';
257 } elsif($1 eq 'server') {
258 $::PGM;
259 } elsif($1 eq 'status') {
260 $status;
261 } elsif($1 eq 'servertime') {
262 time;
263 } elsif($1 eq 'channelinfo') {
264 $self->{'channelinfo'};
265 } else {
266 '';
267 }
268 /gex;
269
270 $self->write($header);
271 }
272
273 sub sendMessages {
274 my $self=shift;
275
276 my $numMessages=0;
277 my $msgTemplate=$self->getConf('MessageTemplate');
278 my $msgData='';
279
280 foreach my $message (@_)
281 {
282 $msgData.=$message->messageWithTemplate($msgTemplate);
283 $numMessages++;
284 }
285
286 return if($numMessages<1);
287
288 $self->write($msgData);
289
290 $::Statistics->{'messages_served'}+=$numMessages;
291
292 my $msgCount=$self->{'messageCount'};
293 $msgCount+=$numMessages;
294 $self->{'messageCount'}=$msgCount;
295
296 # If long polling, close connection, as a message has now been sent.
297 # Don't close if still processing the header (we may be sending a backlog from multiple channels)
298 if($self->getConf('CloseOnEvent') && !exists($self->{'headerBuffer'})) {
299 $self->close(1, 'closedOnEvent');
300 }
301 }
302
303 sub ping {
304 my $self=shift;
305 my $msg=$self->getConf('PingMessage');
306
307 $self->write($msg);
308 }
309
310 sub closeChannel {
311 my $self=shift;
312 my $channelName=shift;
313
314 return unless(exists($self->{'channels'}->{$channelName}));
315
316 my $channel=$self->{'channels'}->{$channelName};
317 $channel->removeSubscriber($self,'channelClose');
318
319 delete($self->{'channels'}->{$channelName});
320
321 $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
322 }
323
324 sub close {
325 my $self=shift;
326 my $noShutdownMsg=shift;
327 my $reason=shift;
328
329 foreach my $channelName (keys %{$self->{'channels'}})
330 {
331 my $channel=$self->{'channels'}->{$channelName};
332 $channel->removeSubscriber($self,$reason);
333 }
334 delete($self->{'channels'});
335
336 # If this connection is in the PersistentConnections array, delete it, then anonymise
337 # it so that if we have to wait for the write buffer to empty before close, it's only
338 # removed once.
339 if(exists($self->{'subscriberID'})) {
340 delete($PersistentConnections{$self->{'subscriberID'}});
341 delete($self->{'subscriberID'});
342 }
343
344 # Send shutdown message unless remote closed or
345 # connection not yet established
346 unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
347 {
348 my $msg=$self->getConf('SubscriberShutdownMsg');
349 if(defined($msg) && $msg ne '')
350 {
351 $self->write($msg);
352 }
353 }
354
355 my $fmsg=$self->getConf('FooterTemplate');
356 if(defined($fmsg) && $fmsg ne '')
357 {
358 $self->write($fmsg);
359 }
360
361 $self->SUPER::close();
362 }
363
364 sub didClose {
365
366 $::Statistics->{'current_subscribers'}--;
367 }
368
369 sub checkForMaxTime {
370 my $self=shift;
371 my $time=shift;
372
373 $self->close(1,'maxTime') if(exists($self->{'connectionTimeLimit'}) && $self->{'connectionTimeLimit'}<$time);
374 }
375
376 sub getConf {
377 my $self=shift;
378 my $key=shift;
379
380 if(exists($self->{'mode'}) && $self->{'mode'} ne '')
381 {
382 my $k=$key.$self->{'mode'};
383
384 if(exists($::CONF{$k})) {
385 return $::CONF{$k};
386 }
387 }
388
389 $::CONF{$key};
390 }
391
392 1;
393 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26