/[meteor]/googlecode.com/svn/trunk/Meteor/Connection.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /googlecode.com/svn/trunk/Meteor/Connection.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 62 - (hide annotations)
Thu Nov 27 00:33:21 2008 UTC (15 years, 5 months ago) by andrew.betts
File size: 9213 byte(s)
1 Fixed: Added SIGPIPE handler.  We noticed that under heavy load
Meteor receives SIGPIPEs from the OS, suspected to relate to clients
that have just disconnected the moment Meteor attempts to write to the
socket.  This caused Meteor to crash.
2 Fixed: Long polling multiple channels no longer causes the loop to
die and restart when some channels have messages queued for delivery.
3 Fixed: Over time, Meteor 'collected' connections from clients that
never got disconnected even if MaxTime was set.  This happened if the
client concerned sent a header with no terminating blank line.  Meteor
kept waiting for the rest of the header, which never arrived, and
therefore the client remained in limbo, never subjected to the MaxTime
time limit because it had not yet become a subscriber.  Clients are
now allowed 30 seconds to send a valid request header.
4 Fixed: If only one message existed on the server, the JS client
would continue to request it again and again, because it has message
ID 0, and the JS client considered this an invalid message ID.
5 Fixed: Corrected some comments in file headers

6 Changed: MaxMessages has been renamed to CloseOnEvent and functions
in a similar, but not quite identical way.  Thanks to Matthew Haak,
who pointed out the extreme confusingness of MaxMessages and a bug
that has resulted in Fix 2 above.  Setting CloseOnEvent to any value
that evaluates to true will cause Meteor to close subscriber
connections after at least one message has been sent and there are no
further messages pending.  This is identical to MaxMessages for values
of 0 and 1, but where MaxMessages is set to a value higher than one,
replacing it with CloseOnEvent with the same value will act as though
it were set to one.  The intent of MaxMessages was to enable long-
polling (and it is used by the JS client in that way), and
CloseonEvent is a drop in replacement for that behaviour.
7 Changed: Meteor JS client now uses dynamic <SCRIPT> tags for all
polling behaviours, rather than XHR.  This enables it to make poll
requests cross-domain (see 13)
8 Changed: Meteor JS client now abstracts timestamp lookups to a
dedicated method.
9 Changed: Default HeaderTemplates no longer include cache busting
headers, since all meteor requests contain a millisecond timestamp and
so no client makes the same request twice.  These were therefore
simply chewing up bandwidth.
10 Changed: Date strings used for logging debug messages are cached to
avoid numerous expensive lookups to localtime().
11 Changed: Channel info is only sent in a response if the client does
not request a restart from a specified ID.  The logic being that if
the client knows the ID they want to start from, they have already
made previous requests and have the channel information they need.
Bandwidth saving measure.

12 Added: JS client now has a Meteor.isSupportedBrowser() method,
which you can call to detemine whether Meteor will run in the user's
browser version.
13 Added: JS client can now use different hosts for polling and
streaming.  This is only really useful if your website is on a domain
that has a lot of cookies, and you don't want to send them in every
poll request.  Removing cookies from request headers can reduce the
size of the request significantly.  We find that with cookies included
Meteor poll requests are usually larger than the responses.  To use,
set Meteor.pollhost.  Meteor.pollhost can be any domain, while
Meteor.host must be a subdomain of your website hostname.
14 Added: Config file now supports new 'FooterTemplate' parameter, for
a string to send just before the connection to the subscriber is
closed.  This is in support of change 7.
15 Added: Better inline documentation for ChannelInfoTemplate config
parameter
16 Added: Log output includes connection IDs corresponding to the file
inode for each connection
17 Added: New controller command LISTCONNECTIONS, produces a newline
delimited list of all currently connected clients, and for each one
displaying "ConnectionID IPAddress ClientType [SubscriberID]"
18 Added: New controller command DESCRIBE, takes a ConnectionID as a
parameter, and outputs numerous statistics about that particular
client, including number of messages sent/received, user agent, IP
address, time connected, time remaining until MaxTime etc.
19 Added: New controller comment LISTSUBSCRIBERS, produces a newline
delimited list of all currently connected streaming subscribers, and
for each one displaying "SubscriberID IPAddress Starttime TimeLimit
TimeRemaining MessageCount UserAgent"
20 Added: SHOWSTATS command produces the following additional stats:
connection_count: total current connections, real_subscribers: total
of number of currently connected streaming subscribers plus the number
of unique polling connections seen in the last 60 seconds.
21 Added: STDERR outputs prior to every exit() for debugging purposes
22 Added: The UDP server is now considered stable, and is the best way
of broadcasting messages to lots of Meteor nodes simultaneously and
efficiently. 


1 knops.gerd 11 #!/usr/bin/perl -w
2     ###############################################################################
3     # Meteor
4     # An HTTP server for the 2.0 web
5     # Copyright (c) 2006 contributing authors
6     #
7     # Subscriber.pm
8     #
9     # Description:
10     # Common super-class for controller and subscriber
11     #
12     ###############################################################################
13     #
14     # This program is free software; you can redistribute it and/or modify it
15     # under the terms of the GNU General Public License as published by the Free
16     # Software Foundation; either version 2 of the License, or (at your option)
17     # any later version.
18     #
19     # This program is distributed in the hope that it will be useful, but WITHOUT
20     # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21     # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22     # more details.
23     #
24     # You should have received a copy of the GNU General Public License along
25     # with this program; if not, write to the Free Software Foundation, Inc.,
26     # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27     #
28     # For more information visit www.meteorserver.org
29     #
30     ###############################################################################
31    
32     package Meteor::Connection;
33     ###############################################################################
34     # Configuration
35     ###############################################################################
36    
37     use strict;
38    
39     use Errno qw(EAGAIN);
40    
41     our $MAX_READ_SIZE=8192;
42     our $CONNECTION_WRITE_TIMEOUT=120;
43    
44     our @Connections=();
45    
46     ###############################################################################
47     # Class methods
48     ###############################################################################
49     sub addAllHandleBits {
50     my $class=shift;
51    
52     my $rVecRef=shift;
53     my $wVecRef=shift;
54     my $eVecRef=shift;
55 knops.gerd 37 my @cons=@Connections;
56     map {$_->addHandleBits($rVecRef,$wVecRef,$eVecRef) if(defined($_)) } @cons;
57 knops.gerd 11 }
58    
59     sub checkAllHandleBits {
60     my $class=shift;
61    
62     my $rVec=shift;
63     my $wVec=shift;
64     my $eVec=shift;
65    
66 knops.gerd 37 my @cons=@Connections;
67     map {$_->checkHandleBits($rVec,$wVec,$eVec) if(defined($_)) } @cons;
68 knops.gerd 11 }
69    
70     sub connectionCount {
71     scalar(@Connections);
72     }
73    
74     sub closeAllConnections {
75     my @cons=@Connections;
76    
77     map { $_->close(); } @cons;
78     }
79    
80 andrew.betts 62 sub listConnections {
81     my $class=shift;
82     my $list='';
83     foreach my $conn (@Connections) {
84     $list .= $conn->{'socketFN'}.' '.$conn->{'ip'}.' '.$conn->{'type'};
85     if (exists($conn->{'subscriberID'})) {
86     $list .= ' '.$conn->{'subscriberID'};
87     }
88     $list .= "$::CRLF";
89     }
90     $list;
91     }
92    
93     sub describeConnWithFileNum {
94     my $class=shift;
95     my $filenum=shift;
96     foreach my $conn (@Connections) {
97     if ($conn->{'socketFN'}==$filenum) {
98     my $ret = "";
99     if (exists($conn->{'socketFN'})) {
100     $ret .= "socketFN: ".$conn->{'socketFN'}."$::CRLF";
101     }
102     if (exists($conn->{'connectionStart'})) {
103     $ret .= "connectionStart: ".$conn->{'connectionStart'}."$::CRLF";
104     }
105     if (exists($conn->{'writeBuffer'})) {
106     $ret .= "writeBuffer: ".$conn->{'writeBuffer'}."$::CRLF";
107     }
108     if (exists($conn->{'readBuffer'})) {
109     $ret .= "readBuffer: ".$conn->{'readBuffer'}."$::CRLF";
110     }
111     if (exists($conn->{'bytesWritten'})) {
112     $ret .= "bytesWritten: ".$conn->{'bytesWritten'}."$::CRLF";
113     }
114     if (exists($conn->{'type'})) {
115     $ret .= "type: ".$conn->{'type'}."$::CRLF";
116     }
117     if (exists($conn->{'mode'})) {
118     $ret .= "mode: ".$conn->{'mode'}."$::CRLF";
119     }
120     if (exists($conn->{'ip'})) {
121     $ret .= "ip: ".$conn->{'ip'}."$::CRLF";
122     }
123     if (exists($conn->{'headerBuffer'})) {
124     $ret .= "headerBuffer: ".$conn->{'headerBuffer'}."$::CRLF";
125     }
126     if (exists($conn->{'messageCount'})) {
127     $ret .= "messageCount: ".$conn->{'messageCount'}."$::CRLF";
128     }
129     if (exists($conn->{'connectionStart'})) {
130     $ret .= "age: ".(time-$conn->{'connectionStart'})."$::CRLF";
131     }
132     if (exists($conn->{'connectionTimeLimit'})) {
133     $ret .= "connectionTimeLimit: ".$conn->{'connectionTimeLimit'}."$::CRLF";
134     }
135     if (exists($conn->{'useragent'})) {
136     $ret .= "useragent: ".$conn->{'useragent'}."$::CRLF";
137     }
138     if (exists($conn->{'subscriberID'})) {
139     $ret .= "subscriberID: ".$conn->{'subscriberID'}."$::CRLF";
140     }
141     return $ret;
142     }
143     }
144     return -1;
145     }
146    
147     sub destroyBadRequests {
148     foreach my $conn (@Connections) {
149     if (time-$conn->{'connectionStart'} > 30 && !exists($conn->{'subscriberID'}) && $conn->{'type'} eq 'Meteor::Subscriber') {
150     &::syslog('debug',"Closing misbehaving subscriber %s",$conn->{'socketFN'});
151     $conn->close();
152     }
153     }
154     }
155    
156 knops.gerd 11 ###############################################################################
157     # Factory methods
158     ###############################################################################
159     sub new {
160     #
161     # Create a new empty instance
162     #
163     my $class=shift;
164    
165     my $obj={};
166    
167     bless($obj,$class);
168     }
169    
170     sub newFromServer {
171     #
172     # new instance from new server connection
173     #
174     my $self=shift->new();
175    
176 knops.gerd 25 $::Statistics->{'total_requests'}++;
177    
178 knops.gerd 11 my $server=shift;
179     my $socket=$server->conSocket();
180    
181     $self->{'socket'}=$socket;
182     $self->{'socketFN'}=$socket->fileno();
183 andrew.betts 62 $self->{'connectionStart'}=time;
184 knops.gerd 11
185     $socket->setNonBlocking();
186    
187     $self->{'writeBuffer'}='';
188     $self->{'readBuffer'}='';
189 knops.gerd 47 $self->{'bytesWritten'}=0;
190 andrew.betts 62 $self->{'type'}=ref($self);
191 andrew.betts 53 $self->{'ip'}=$socket->{'connection'}->{'remoteIP'};
192 knops.gerd 47
193 knops.gerd 11 push(@Connections,$self);
194    
195 andrew.betts 62 &::syslog('debug',"New %s for %s using file number %s",ref($self),$self->{'ip'},$self->{'socketFN'});
196 knops.gerd 11
197     $self;
198     }
199    
200     ###############################################################################
201     # Instance methods
202     ###############################################################################
203     sub write {
204     my $self=shift;
205    
206     $self->{'writeBuffer'}.=shift;
207     $self->{'writeBufferTimestamp'}=time unless(exists($self->{'writeBufferTimestamp'}));
208     }
209    
210     sub addHandleBits {
211     my $self=shift;
212    
213     my $rVecRef=shift;
214     my $wVecRef=shift;
215     my $eVecRef=shift;
216    
217     my $fno=$self->{'socketFN'};
218    
219     if($self->{'writeBuffer'} ne '')
220     {
221     if(exists($self->{'writeBufferTimestamp'}) && $self->{'writeBufferTimestamp'}+$CONNECTION_WRITE_TIMEOUT<time)
222     {
223     &::syslog('debug',"%s for %s: write timed out",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
224    
225     $self->{'writeBuffer'}='';
226     $self->close();
227     return;
228     }
229     vec($$wVecRef,$fno,1)=1;
230     }
231    
232     vec($$rVecRef,$fno,1)=1;
233     vec($$eVecRef,$fno,1)=1;
234     }
235    
236     sub checkHandleBits {
237     my $self=shift;
238    
239     my $rVec=shift;
240     my $wVec=shift;
241     my $eVec=shift;
242    
243     my $fno=$self->{'socketFN'};
244    
245     if(vec($eVec,$fno,1))
246     {
247     #
248     # Something went wrong!
249     #
250     $self->exceptionReceived();
251    
252     return;
253     }
254    
255     if(vec($rVec,$fno,1))
256     {
257     #
258     # Data available for read
259     #
260     my $socket=$self->{'socket'};
261    
262     my $buffer='';
263     my $bytesRead=sysread($socket->{'handle'},$buffer,$MAX_READ_SIZE);
264     if(defined($bytesRead) && $bytesRead>0)
265     {
266 knops.gerd 25 $::Statistics->{'total_inbound_bytes'}+=$bytesRead;
267 knops.gerd 11 $self->{'readBuffer'}.=$buffer;
268     while($self->{'readBuffer'}=~s/^([^\r\n]*)\r?\n//)
269     {
270     $self->processLine($1);
271     }
272     }
273     elsif(defined($bytesRead) && $bytesRead==0)
274     {
275     # Connection closed
276     $self->{'remoteClosed'}=1;
277 andrew.betts 50 $self->close(1, 'remoteClosed');
278 knops.gerd 11
279     return;
280     }
281     else
282     {
283     unless(${!}==EAGAIN)
284     {
285     &::syslog('notice',"Connection closed: $!");
286     $self->{'remoteClosed'}=1;
287 andrew.betts 50 $self->close(1, 'remoteClosed');
288 knops.gerd 11
289     return;
290     }
291     }
292     }
293    
294     if(vec($wVec,$fno,1) && $self->{'writeBuffer'} ne '')
295     {
296     #
297     # Can write
298     #
299     my $socket=$self->{'socket'};
300    
301     my $bytesWritten=syswrite($socket->{'handle'},$self->{'writeBuffer'});
302    
303     if(defined($bytesWritten) && $bytesWritten>0)
304     {
305 knops.gerd 25 $::Statistics->{'total_outbound_bytes'}+=$bytesWritten;
306 knops.gerd 47 $self->{'bytesWritten'}+=$bytesWritten;
307 knops.gerd 11 $self->{'writeBuffer'}=substr($self->{'writeBuffer'},$bytesWritten);
308     if(length($self->{'writeBuffer'})==0)
309     {
310     delete($self->{'writeBufferTimestamp'});
311 andrew.betts 53 $self->close(1) if(exists($self->{'autoClose'}));
312 knops.gerd 11 }
313     else
314     {
315     $self->{'writeBufferTimestamp'}=time;
316     }
317     }
318     else
319     {
320     unless(${!}==EAGAIN)
321     {
322     &::syslog('notice',"Connection closed: $!");
323     $self->{'remoteClosed'}=1;
324 andrew.betts 50 $self->close(1, 'remoteClosed');
325 knops.gerd 11
326     return;
327     }
328     }
329     }
330     }
331    
332     sub exceptionReceived {
333     my $self=shift;
334    
335     $self->{'writeBuffer'}='';
336    
337     $self->close();
338     }
339    
340     sub close {
341     my $self=shift;
342    
343     #&::syslog('debug',"Close called for %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
344    
345     unless($self->{'remoteClosed'})
346     {
347     if(!exists($self->{'autoClose'}) && length($self->{'writeBuffer'})>0)
348     {
349     $self->{'autoClose'}=1;
350    
351     &::syslog('debug',"Will close %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
352    
353     return;
354     }
355     }
356    
357     eval {
358     $self->{'socket'}->close();
359     };
360    
361     #
362     # Remove connection from list of connections
363     #
364     my $idx=undef;
365 andrew.betts 50 my $numcon = scalar(@Connections);
366     for(my $i=0;$i<$numcon;$i++)
367 knops.gerd 11 {
368     if($Connections[$i]==$self)
369     {
370     $idx=$i;
371     last;
372     }
373     }
374    
375     if(defined($idx))
376     {
377     splice(@Connections,$idx,1);
378     }
379    
380     &::syslog('debug',"Closed %s for %s",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
381 knops.gerd 37
382     $self->didClose();
383 knops.gerd 11 }
384    
385 knops.gerd 37 sub didClose {
386     }
387    
388 knops.gerd 11 1;
389 andrew.betts 62 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26