/[meteor]/googlecode.com/svn/trunk/Meteor/Connection.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /googlecode.com/svn/trunk/Meteor/Connection.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 37 - (hide annotations)
Fri Feb 1 21:22:03 2008 UTC (16 years, 2 months ago) by knops.gerd
File size: 6683 byte(s)
• Connection: In checkAllHandleBits (and to be sure also in addAllHandleBits) iterate over copy of @Connections, as @Connections might change during iteration due to connections being closed
• Controller, Subscriber: pending data will abort a close and call it again later, causing current_subscribers/current_controllers statistics to be incorrect. Added new 'didClose' method that is only called when a connection is actually closed, and this is where the close is counted.

1 knops.gerd 11 #!/usr/bin/perl -w
2     ###############################################################################
3     # Meteor
4     # An HTTP server for the 2.0 web
5     # Copyright (c) 2006 contributing authors
6     #
7     # Subscriber.pm
8     #
9     # Description:
10     # Common super-class for controller and subscriber
11     #
12     ###############################################################################
13     #
14     # This program is free software; you can redistribute it and/or modify it
15     # under the terms of the GNU General Public License as published by the Free
16     # Software Foundation; either version 2 of the License, or (at your option)
17     # any later version.
18     #
19     # This program is distributed in the hope that it will be useful, but WITHOUT
20     # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21     # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22     # more details.
23     #
24     # You should have received a copy of the GNU General Public License along
25     # with this program; if not, write to the Free Software Foundation, Inc.,
26     # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27     #
28     # For more information visit www.meteorserver.org
29     #
30     ###############################################################################
31    
32     package Meteor::Connection;
33     ###############################################################################
34     # Configuration
35     ###############################################################################
36    
37     use strict;
38    
39     use Errno qw(EAGAIN);
40    
41     our $MAX_READ_SIZE=8192;
42     our $CONNECTION_WRITE_TIMEOUT=120;
43    
44     our @Connections=();
45    
46     ###############################################################################
47     # Class methods
48     ###############################################################################
49     sub addAllHandleBits {
50     my $class=shift;
51    
52     my $rVecRef=shift;
53     my $wVecRef=shift;
54     my $eVecRef=shift;
55    
56 knops.gerd 37 my @cons=@Connections;
57     map {$_->addHandleBits($rVecRef,$wVecRef,$eVecRef) if(defined($_)) } @cons;
58 knops.gerd 11 }
59    
60     sub checkAllHandleBits {
61     my $class=shift;
62    
63     my $rVec=shift;
64     my $wVec=shift;
65     my $eVec=shift;
66    
67 knops.gerd 37 my @cons=@Connections;
68     map {$_->checkHandleBits($rVec,$wVec,$eVec) if(defined($_)) } @cons;
69 knops.gerd 11 }
70    
71     sub connectionCount {
72     scalar(@Connections);
73     }
74    
75     sub closeAllConnections {
76     my @cons=@Connections;
77    
78     map { $_->close(); } @cons;
79     }
80    
81     ###############################################################################
82     # Factory methods
83     ###############################################################################
84     sub new {
85     #
86     # Create a new empty instance
87     #
88     my $class=shift;
89    
90     my $obj={};
91    
92     bless($obj,$class);
93     }
94    
95     sub newFromServer {
96     #
97     # new instance from new server connection
98     #
99     my $self=shift->new();
100    
101 knops.gerd 25 $::Statistics->{'total_requests'}++;
102    
103 knops.gerd 11 my $server=shift;
104     my $socket=$server->conSocket();
105    
106     $self->{'socket'}=$socket;
107     $self->{'socketFN'}=$socket->fileno();
108    
109     $socket->setNonBlocking();
110    
111     $self->{'writeBuffer'}='';
112     $self->{'readBuffer'}='';
113    
114     push(@Connections,$self);
115    
116     &::syslog('debug',"New %s for %s",ref($self),$socket->{'connection'}->{'remoteIP'});
117    
118     $self;
119     }
120    
121     ###############################################################################
122     # Instance methods
123     ###############################################################################
124     sub write {
125     my $self=shift;
126    
127     $self->{'writeBuffer'}.=shift;
128     $self->{'writeBufferTimestamp'}=time unless(exists($self->{'writeBufferTimestamp'}));
129     }
130    
131     sub addHandleBits {
132     my $self=shift;
133    
134     my $rVecRef=shift;
135     my $wVecRef=shift;
136     my $eVecRef=shift;
137    
138     my $fno=$self->{'socketFN'};
139    
140     if($self->{'writeBuffer'} ne '')
141     {
142     if(exists($self->{'writeBufferTimestamp'}) && $self->{'writeBufferTimestamp'}+$CONNECTION_WRITE_TIMEOUT<time)
143     {
144     &::syslog('debug',"%s for %s: write timed out",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
145    
146     $self->{'writeBuffer'}='';
147     $self->close();
148     return;
149     }
150     vec($$wVecRef,$fno,1)=1;
151     }
152    
153     vec($$rVecRef,$fno,1)=1;
154     vec($$eVecRef,$fno,1)=1;
155     }
156    
157     sub checkHandleBits {
158     my $self=shift;
159    
160     my $rVec=shift;
161     my $wVec=shift;
162     my $eVec=shift;
163    
164     my $fno=$self->{'socketFN'};
165    
166     if(vec($eVec,$fno,1))
167     {
168     #
169     # Something went wrong!
170     #
171     $self->exceptionReceived();
172    
173     return;
174     }
175    
176     if(vec($rVec,$fno,1))
177     {
178     #
179     # Data available for read
180     #
181     my $socket=$self->{'socket'};
182    
183     my $buffer='';
184     my $bytesRead=sysread($socket->{'handle'},$buffer,$MAX_READ_SIZE);
185     if(defined($bytesRead) && $bytesRead>0)
186     {
187 knops.gerd 25 $::Statistics->{'total_inbound_bytes'}+=$bytesRead;
188 knops.gerd 11 $self->{'readBuffer'}.=$buffer;
189     while($self->{'readBuffer'}=~s/^([^\r\n]*)\r?\n//)
190     {
191     $self->processLine($1);
192     }
193     }
194     elsif(defined($bytesRead) && $bytesRead==0)
195     {
196     # Connection closed
197     $self->{'remoteClosed'}=1;
198     $self->close();
199    
200     return;
201     }
202     else
203     {
204     unless(${!}==EAGAIN)
205     {
206     &::syslog('notice',"Connection closed: $!");
207     $self->{'remoteClosed'}=1;
208     $self->close();
209    
210     return;
211     }
212     }
213     }
214    
215     if(vec($wVec,$fno,1) && $self->{'writeBuffer'} ne '')
216     {
217     #
218     # Can write
219     #
220     my $socket=$self->{'socket'};
221    
222     my $bytesWritten=syswrite($socket->{'handle'},$self->{'writeBuffer'});
223    
224     if(defined($bytesWritten) && $bytesWritten>0)
225     {
226 knops.gerd 25 $::Statistics->{'total_outbound_bytes'}+=$bytesWritten;
227 knops.gerd 11 $self->{'writeBuffer'}=substr($self->{'writeBuffer'},$bytesWritten);
228     if(length($self->{'writeBuffer'})==0)
229     {
230     delete($self->{'writeBufferTimestamp'});
231     $self->close() if(exists($self->{'autoClose'}));
232     }
233     else
234     {
235     $self->{'writeBufferTimestamp'}=time;
236     }
237     }
238     else
239     {
240     unless(${!}==EAGAIN)
241     {
242     &::syslog('notice',"Connection closed: $!");
243     $self->{'remoteClosed'}=1;
244     $self->close();
245    
246     return;
247     }
248     }
249     }
250     }
251    
252     sub exceptionReceived {
253     my $self=shift;
254    
255     $self->{'writeBuffer'}='';
256    
257     $self->close();
258     }
259    
260     sub close {
261     my $self=shift;
262    
263     #&::syslog('debug',"Close called for %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
264    
265     unless($self->{'remoteClosed'})
266     {
267     if(!exists($self->{'autoClose'}) && length($self->{'writeBuffer'})>0)
268     {
269     $self->{'autoClose'}=1;
270    
271     &::syslog('debug',"Will close %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
272    
273     return;
274     }
275     }
276    
277     eval {
278     $self->{'socket'}->close();
279     };
280    
281     #
282     # Remove connection from list of connections
283     #
284     my $idx=undef;
285     for(my $i=0;$i<scalar(@Connections);$i++)
286     {
287     if($Connections[$i]==$self)
288     {
289     $idx=$i;
290     last;
291     }
292     }
293    
294     if(defined($idx))
295     {
296     splice(@Connections,$idx,1);
297     }
298    
299     &::syslog('debug',"Closed %s for %s",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
300 knops.gerd 37
301     $self->didClose();
302 knops.gerd 11 }
303    
304 knops.gerd 37 sub didClose {
305     }
306    
307 knops.gerd 11 1;
308 andrew.betts 3 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26