/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 62 - (hide annotations)
Thu Nov 27 00:33:21 2008 UTC (15 years, 4 months ago) by andrew.betts
File size: 10330 byte(s)
1 Fixed: Added SIGPIPE handler.  We noticed that under heavy load
Meteor receives SIGPIPEs from the OS, suspected to relate to clients
that have just disconnected the moment Meteor attempts to write to the
socket.  This caused Meteor to crash.
2 Fixed: Long polling multiple channels no longer causes the loop to
die and restart when some channels have messages queued for delivery.
3 Fixed: Over time, Meteor 'collected' connections from clients that
never got disconnected even if MaxTime was set.  This happened if the
client concerned sent a header with no terminating blank line.  Meteor
kept waiting for the rest of the header, which never arrived, and
therefore the client remained in limbo, never subjected to the MaxTime
time limit because it had not yet become a subscriber.  Clients are
now allowed 30 seconds to send a valid request header.
4 Fixed: If only one message existed on the server, the JS client
would continue to request it again and again, because it has message
ID 0, and the JS client considered this an invalid message ID.
5 Fixed: Corrected some comments in file headers

6 Changed: MaxMessages has been renamed to CloseOnEvent and functions
in a similar, but not quite identical way.  Thanks to Matthew Haak,
who pointed out the extreme confusingness of MaxMessages and a bug
that has resulted in Fix 2 above.  Setting CloseOnEvent to any value
that evaluates to true will cause Meteor to close subscriber
connections after at least one message has been sent and there are no
further messages pending.  This is identical to MaxMessages for values
of 0 and 1, but where MaxMessages is set to a value higher than one,
replacing it with CloseOnEvent with the same value will act as though
it were set to one.  The intent of MaxMessages was to enable long-
polling (and it is used by the JS client in that way), and
CloseonEvent is a drop in replacement for that behaviour.
7 Changed: Meteor JS client now uses dynamic <SCRIPT> tags for all
polling behaviours, rather than XHR.  This enables it to make poll
requests cross-domain (see 13)
8 Changed: Meteor JS client now abstracts timestamp lookups to a
dedicated method.
9 Changed: Default HeaderTemplates no longer include cache busting
headers, since all meteor requests contain a millisecond timestamp and
so no client makes the same request twice.  These were therefore
simply chewing up bandwidth.
10 Changed: Date strings used for logging debug messages are cached to
avoid numerous expensive lookups to localtime().
11 Changed: Channel info is only sent in a response if the client does
not request a restart from a specified ID.  The logic being that if
the client knows the ID they want to start from, they have already
made previous requests and have the channel information they need.
Bandwidth saving measure.

12 Added: JS client now has a Meteor.isSupportedBrowser() method,
which you can call to detemine whether Meteor will run in the user's
browser version.
13 Added: JS client can now use different hosts for polling and
streaming.  This is only really useful if your website is on a domain
that has a lot of cookies, and you don't want to send them in every
poll request.  Removing cookies from request headers can reduce the
size of the request significantly.  We find that with cookies included
Meteor poll requests are usually larger than the responses.  To use,
set Meteor.pollhost.  Meteor.pollhost can be any domain, while
Meteor.host must be a subdomain of your website hostname.
14 Added: Config file now supports new 'FooterTemplate' parameter, for
a string to send just before the connection to the subscriber is
closed.  This is in support of change 7.
15 Added: Better inline documentation for ChannelInfoTemplate config
parameter
16 Added: Log output includes connection IDs corresponding to the file
inode for each connection
17 Added: New controller command LISTCONNECTIONS, produces a newline
delimited list of all currently connected clients, and for each one
displaying "ConnectionID IPAddress ClientType [SubscriberID]"
18 Added: New controller command DESCRIBE, takes a ConnectionID as a
parameter, and outputs numerous statistics about that particular
client, including number of messages sent/received, user agent, IP
address, time connected, time remaining until MaxTime etc.
19 Added: New controller comment LISTSUBSCRIBERS, produces a newline
delimited list of all currently connected streaming subscribers, and
for each one displaying "SubscriberID IPAddress Starttime TimeLimit
TimeRemaining MessageCount UserAgent"
20 Added: SHOWSTATS command produces the following additional stats:
connection_count: total current connections, real_subscribers: total
of number of currently connected streaming subscribers plus the number
of unique polling connections seen in the last 60 seconds.
21 Added: STDERR outputs prior to every exit() for debugging purposes
22 Added: The UDP server is now considered stable, and is the best way
of broadcasting messages to lots of Meteor nodes simultaneously and
efficiently. 


1 knops.gerd 11 #!/usr/bin/perl -w
2     ###############################################################################
3     # Meteor
4     # An HTTP server for the 2.0 web
5     # Copyright (c) 2006 contributing authors
6     #
7     # Subscriber.pm
8     #
9     # Description:
10     # A Meteor Subscriber
11     #
12     ###############################################################################
13     #
14     # This program is free software; you can redistribute it and/or modify it
15     # under the terms of the GNU General Public License as published by the Free
16     # Software Foundation; either version 2 of the License, or (at your option)
17     # any later version.
18     #
19     # This program is distributed in the hope that it will be useful, but WITHOUT
20     # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21     # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22     # more details.
23     #
24     # You should have received a copy of the GNU General Public License along
25     # with this program; if not, write to the Free Software Foundation, Inc.,
26     # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27     #
28     # For more information visit www.meteorserver.org
29     #
30     ###############################################################################
31    
32     package Meteor::Subscriber;
33     ###############################################################################
34     # Configuration
35     ###############################################################################
36    
37     use strict;
38    
39     use Meteor::Connection;
40     use Meteor::Channel;
41     use Meteor::Document;
42    
43     @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44    
45     our %PersistentConnections=();
46 knops.gerd 25 our $NumAcceptedConnections=0;
47 knops.gerd 37
48 knops.gerd 11
49     ###############################################################################
50     # Factory methods
51     ###############################################################################
52     sub newFromServer {
53     my $class=shift;
54    
55     my $self=$class->SUPER::newFromServer(shift);
56    
57     $self->{'headerBuffer'}='';
58 andrew.betts 62 $self->{'messageCount'}=0;
59 knops.gerd 11
60     my $maxTime=$::CONF{'MaxTime'};
61     if($maxTime>0)
62     {
63 andrew.betts 62 $self->{'connectionTimeLimit'}=$self->{'connectionStart'}+$maxTime;
64 knops.gerd 11 }
65    
66 knops.gerd 25 $::Statistics->{'current_subscribers'}++;
67     $::Statistics->{'subscriber_connections_accepted'}++;
68    
69 knops.gerd 11 $self;
70     }
71    
72     ###############################################################################
73     # Class methods
74     ###############################################################################
75     sub deleteSubscriberWithID {
76     my $class=shift;
77     my $id=shift;
78    
79     if(exists($PersistentConnections{$id}))
80     {
81 andrew.betts 50 $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
82 knops.gerd 11 }
83     }
84    
85 andrew.betts 62 sub subscriberExists {
86     my $class=shift;
87     my $id=shift;
88    
89     return exists($PersistentConnections{$id});
90     }
91    
92 knops.gerd 11 sub pingPersistentConnections {
93     my $class=shift;
94    
95     my @cons=values %PersistentConnections;
96    
97 knops.gerd 45 map { $_->ping() } @cons;
98 knops.gerd 11 }
99    
100     sub checkPersistentConnectionsForMaxTime {
101     my $class=shift;
102    
103     my $time=time;
104     my @cons=values %PersistentConnections;
105    
106     map { $_->checkForMaxTime($time) } @cons;
107     }
108    
109 knops.gerd 25 sub numSubscribers {
110    
111     return scalar(keys %PersistentConnections);
112     }
113    
114 andrew.betts 62 sub listSubscribers {
115     my $class=shift;
116     my $list='';
117     foreach my $subscriber (keys %PersistentConnections)
118     {
119     my $sub = $PersistentConnections{$subscriber};
120     $list .= $subscriber.' '.$sub->{'ip'}.' '.$sub->{'connectionStart'}.' '.$sub->{'connectionTimeLimit'}.' '.($sub->{'connectionTimeLimit'}-time).' '.$sub->{'messageCount'}.' "'.$sub->{'useragent'}."\"$::CRLF";
121     }
122     $list;
123     }
124    
125 knops.gerd 11 ###############################################################################
126     # Instance methods
127     ###############################################################################
128     sub processLine {
129     my $self=shift;
130     my $line=shift;
131    
132 andrew.betts 62 # Once the header was processed we ignore any input - Meteor does not accept or process request bodies
133 knops.gerd 11 return unless(exists($self->{'headerBuffer'}));
134    
135     if($line ne '')
136     {
137     #
138     # Accumulate header
139     #
140     $self->{'headerBuffer'}.="$line\n";
141     }
142     else
143     {
144     #
145     # Empty line signals end of header.
146     # Analyze header, register with appropiate channel
147     # and send pending messages.
148     #
149 andrew.betts 32 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
150 knops.gerd 11 #
151     # Find the 'GET' line
152     #
153 andrew.betts 56 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i)
154 knops.gerd 11 {
155 andrew.betts 50 $self->{'subscriberID'}=$1;
156 andrew.betts 32 $self->{'mode'}=$2;
157 knops.gerd 45 my $persist=$self->getConf('Persist');
158     my $maxTime=$self->getConf('MaxTime');
159 andrew.betts 62 $self->{'connectionTimeLimit'} = ($self->{'connectionStart'}+$maxTime) if ($maxTime>0);
160 knops.gerd 45
161 andrew.betts 32 my @channelData=split('/',$3);
162 knops.gerd 13 my $channels={};
163 andrew.betts 32 my $channelName;
164     my $offset;
165 andrew.betts 35 foreach my $chandef (@channelData) {
166 andrew.betts 51 if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
167 andrew.betts 32 $channelName = $1;
168     $channels->{$channelName}->{'startIndex'} = undef;
169 andrew.betts 35 if ($3) {
170     $offset = $4;
171     if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
172     if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
173     if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
174 knops.gerd 13 }
175 knops.gerd 11 }
176     }
177 andrew.betts 62 $self->{'useragent'} = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
178 knops.gerd 11
179 andrew.betts 50 if ($persist) {
180 andrew.betts 62 # New persistent connection: kill any existing connection with same ID
181 andrew.betts 50 $self->deleteSubscriberWithID($self->{'subscriberID'});
182 andrew.betts 62 # Add new persistent connection to collection
183 andrew.betts 50 $PersistentConnections{$self->{'subscriberID'}}=$self;
184 andrew.betts 62 } else {
185     $::Pollers->{$self->{'subscriberID'}} = time;
186 knops.gerd 11 }
187    
188 andrew.betts 53 if(scalar(keys %{$channels})) {
189    
190     $self->{'channelinfo'} = '';
191     my $citemplate = $self->getConf('ChannelInfoTemplate');
192     foreach $channelName (keys %{$channels}) {
193     my $channel=Meteor::Channel->channelWithName($channelName);
194     $self->{'channels'}->{$channelName}=$channel;
195 andrew.betts 62 if (defined($self->{'channels'}->{$channelName}->{'startIndex'}) && $self->{'channels'}->{$channelName}->{'startIndex'} > 0) {
196     $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
197     }
198 andrew.betts 53 }
199 knops.gerd 11 $self->emitOKHeader();
200 andrew.betts 53 foreach $channelName (keys %{$channels}) {
201     my $startIndex=$channels->{$channelName}->{'startIndex'};
202 andrew.betts 62 $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$self->{'useragent'});
203 andrew.betts 53 }
204     delete ($self->{'channels'}) unless($persist);
205 andrew.betts 50 $self->close(1, 'responseComplete') unless($persist);
206 andrew.betts 62 $self->close(1, 'closedOnEvent') unless($self->{'messageCount'} == 0);
207     delete($self->{'headerBuffer'});
208 knops.gerd 11 return;
209     }
210     }
211 andrew.betts 32 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
212     {
213     $self->deleteSubscriberWithID($1);
214     $self->emitOKHeader();
215 andrew.betts 50 $self->close(1, 'disconnectRequested');
216 andrew.betts 32 return;
217     }
218 knops.gerd 11 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
219     {
220     Meteor::Document->serveFileToClient($1,$self);
221 andrew.betts 50 $self->close(1, 'responseComplete');
222 knops.gerd 11 return;
223     }
224    
225     #
226     # If we fall through we did not understand the request
227     #
228     $self->emitErrorHeader();
229     }
230     }
231    
232     sub emitOKHeader {
233     my $self=shift;
234    
235     $self->emitHeader('200 OK');
236     }
237    
238     sub emitErrorHeader {
239     my $self=shift;
240    
241     $self->emitHeader('404 Not Found');
242 knops.gerd 25 $::Statistics->{'errors_served'}++;
243 knops.gerd 11
244     # close up shop here!
245 andrew.betts 50 $self->close(0, 'error');
246 knops.gerd 11 }
247    
248     sub emitHeader {
249     my $self=shift;
250     my $status=shift;
251    
252 knops.gerd 49 my $header=$self->getConf('HeaderTemplate');
253 knops.gerd 11
254 knops.gerd 45 $header=~s/~([^~]*)~/
255 andrew.betts 53 if(!defined($1) || $1 eq '') {
256 knops.gerd 11 '~';
257 andrew.betts 53 } elsif($1 eq 'server') {
258 knops.gerd 11 $::PGM;
259 andrew.betts 53 } elsif($1 eq 'status') {
260 knops.gerd 11 $status;
261 andrew.betts 53 } elsif($1 eq 'servertime') {
262 knops.gerd 11 time;
263 andrew.betts 53 } elsif($1 eq 'channelinfo') {
264     $self->{'channelinfo'};
265     } else {
266 knops.gerd 11 '';
267     }
268     /gex;
269    
270 andrew.betts 39 $self->write($header);
271 knops.gerd 11 }
272    
273 knops.gerd 45 sub sendMessages {
274 knops.gerd 11 my $self=shift;
275    
276 knops.gerd 45 my $numMessages=0;
277 andrew.betts 50 my $msgTemplate=$self->getConf('MessageTemplate');
278 knops.gerd 45 my $msgData='';
279 knops.gerd 25
280 knops.gerd 45 foreach my $message (@_)
281     {
282     $msgData.=$message->messageWithTemplate($msgTemplate);
283     $numMessages++;
284     }
285 knops.gerd 11
286 knops.gerd 45 return if($numMessages<1);
287 knops.gerd 25
288 knops.gerd 45 $self->write($msgData);
289 knops.gerd 11
290 knops.gerd 45 $::Statistics->{'messages_served'}+=$numMessages;
291    
292 andrew.betts 62 my $msgCount=$self->{'messageCount'};
293 knops.gerd 45 $msgCount+=$numMessages;
294 andrew.betts 62 $self->{'messageCount'}=$msgCount;
295 knops.gerd 45
296 andrew.betts 62 # If long polling, close connection, as a message has now been sent.
297     # Don't close if still processing the header (we may be sending a backlog from multiple channels)
298     if($self->getConf('CloseOnEvent') && !exists($self->{'headerBuffer'})) {
299     $self->close(1, 'closedOnEvent');
300 knops.gerd 11 }
301     }
302    
303 knops.gerd 45 sub ping {
304     my $self=shift;
305     my $msg=$self->getConf('PingMessage');
306    
307     $self->write($msg);
308     }
309    
310 knops.gerd 13 sub closeChannel {
311     my $self=shift;
312     my $channelName=shift;
313    
314     return unless(exists($self->{'channels'}->{$channelName}));
315    
316     my $channel=$self->{'channels'}->{$channelName};
317 knops.gerd 47 $channel->removeSubscriber($self,'channelClose');
318 knops.gerd 13
319     delete($self->{'channels'}->{$channelName});
320    
321 andrew.betts 50 $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
322 knops.gerd 13 }
323    
324 knops.gerd 11 sub close {
325     my $self=shift;
326     my $noShutdownMsg=shift;
327 andrew.betts 50 my $reason=shift;
328 knops.gerd 11
329 knops.gerd 13 foreach my $channelName (keys %{$self->{'channels'}})
330     {
331     my $channel=$self->{'channels'}->{$channelName};
332 andrew.betts 50 $channel->removeSubscriber($self,$reason);
333 knops.gerd 13 }
334     delete($self->{'channels'});
335 knops.gerd 11
336 andrew.betts 50 # If this connection is in the PersistentConnections array, delete it, then anonymise
337     # it so that if we have to wait for the write buffer to empty before close, it's only
338     # removed once.
339     if(exists($self->{'subscriberID'})) {
340 knops.gerd 11 delete($PersistentConnections{$self->{'subscriberID'}});
341 andrew.betts 50 delete($self->{'subscriberID'});
342 knops.gerd 11 }
343    
344     # Send shutdown message unless remote closed or
345     # connection not yet established
346     unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
347     {
348 knops.gerd 45 my $msg=$self->getConf('SubscriberShutdownMsg');
349 knops.gerd 11 if(defined($msg) && $msg ne '')
350     {
351     $self->write($msg);
352     }
353     }
354 andrew.betts 62
355     my $fmsg=$self->getConf('FooterTemplate');
356     if(defined($fmsg) && $fmsg ne '')
357     {
358     $self->write($fmsg);
359     }
360    
361 knops.gerd 11 $self->SUPER::close();
362     }
363    
364 knops.gerd 37 sub didClose {
365    
366     $::Statistics->{'current_subscribers'}--;
367     }
368    
369 knops.gerd 11 sub checkForMaxTime {
370     my $self=shift;
371     my $time=shift;
372    
373 andrew.betts 62 $self->close(1,'maxTime') if(exists($self->{'connectionTimeLimit'}) && $self->{'connectionTimeLimit'}<$time);
374 knops.gerd 11 }
375    
376 knops.gerd 45 sub getConf {
377     my $self=shift;
378     my $key=shift;
379    
380     if(exists($self->{'mode'}) && $self->{'mode'} ne '')
381     {
382 andrew.betts 50 my $k=$key.$self->{'mode'};
383 knops.gerd 45
384 andrew.betts 50 if(exists($::CONF{$k})) {
385     return $::CONF{$k};
386     }
387 knops.gerd 45 }
388    
389     $::CONF{$key};
390     }
391    
392 knops.gerd 11 1;
393 andrew.betts 3 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26