/[meteor]/googlecode.com/svn/trunk/Meteor/Connection.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /googlecode.com/svn/trunk/Meteor/Connection.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 64 - (show annotations)
Mon Jan 19 11:19:41 2009 UTC (15 years, 2 months ago) by andrew.betts
File size: 9238 byte(s)
Release 1.06.04 as documented in Google Group

1 #!/usr/bin/perl -w
2 ###############################################################################
3 # Meteor
4 # An HTTP server for the 2.0 web
5 # Copyright (c) 2006 contributing authors
6 #
7 # Subscriber.pm
8 #
9 # Description:
10 # Common super-class for controller and subscriber
11 #
12 ###############################################################################
13 #
14 # This program is free software; you can redistribute it and/or modify it
15 # under the terms of the GNU General Public License as published by the Free
16 # Software Foundation; either version 2 of the License, or (at your option)
17 # any later version.
18 #
19 # This program is distributed in the hope that it will be useful, but WITHOUT
20 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22 # more details.
23 #
24 # You should have received a copy of the GNU General Public License along
25 # with this program; if not, write to the Free Software Foundation, Inc.,
26 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #
28 # For more information visit www.meteorserver.org
29 #
30 ###############################################################################
31
32 package Meteor::Connection;
33 ###############################################################################
34 # Configuration
35 ###############################################################################
36
37 use strict;
38
39 use Errno qw(EAGAIN);
40
41 our $MAX_READ_SIZE=8192;
42 our $CONNECTION_WRITE_TIMEOUT=120;
43
44 our @Connections=();
45
46 ###############################################################################
47 # Class methods
48 ###############################################################################
49 sub addAllHandleBits {
50 my $class=shift;
51
52 my $rVecRef=shift;
53 my $wVecRef=shift;
54 my $eVecRef=shift;
55 my @cons=@Connections;
56 map {$_->addHandleBits($rVecRef,$wVecRef,$eVecRef) if(defined($_)) } @cons;
57 }
58
59 sub checkAllHandleBits {
60 my $class=shift;
61
62 my $rVec=shift;
63 my $wVec=shift;
64 my $eVec=shift;
65
66 my @cons=@Connections;
67 map {$_->checkHandleBits($rVec,$wVec,$eVec) if(defined($_)) } @cons;
68 }
69
70 sub connectionCount {
71 scalar(@Connections);
72 }
73
74 sub closeAllConnections {
75 my @cons=@Connections;
76
77 map { $_->close(); } @cons;
78 }
79
80 sub listConnections {
81 my $class=shift;
82 my $list='';
83 foreach my $conn (@Connections) {
84 $list .= $conn->{'socketFN'}.' '.$conn->{'ip'}.' '.$conn->{'type'};
85 if (exists($conn->{'subscriberID'})) {
86 $list .= ' '.$conn->{'subscriberID'};
87 }
88 $list .= "$::CRLF";
89 }
90 $list;
91 }
92
93 sub describeConnWithFileNum {
94 my $class=shift;
95 my $filenum=shift;
96 foreach my $conn (@Connections) {
97 if ($conn->{'socketFN'}==$filenum) {
98 my $ret = "";
99 if (exists($conn->{'socketFN'})) {
100 $ret .= "socketFN: ".$conn->{'socketFN'}."$::CRLF";
101 }
102 if (exists($conn->{'connectionStart'})) {
103 $ret .= "connectionStart: ".$conn->{'connectionStart'}."$::CRLF";
104 }
105 if (exists($conn->{'writeBuffer'})) {
106 $ret .= "writeBuffer: ".$conn->{'writeBuffer'}."$::CRLF";
107 }
108 if (exists($conn->{'readBuffer'})) {
109 $ret .= "readBuffer: ".$conn->{'readBuffer'}."$::CRLF";
110 }
111 if (exists($conn->{'bytesWritten'})) {
112 $ret .= "bytesWritten: ".$conn->{'bytesWritten'}."$::CRLF";
113 }
114 if (exists($conn->{'type'})) {
115 $ret .= "type: ".$conn->{'type'}."$::CRLF";
116 }
117 if (exists($conn->{'mode'})) {
118 $ret .= "mode: ".$conn->{'mode'}."$::CRLF";
119 }
120 if (exists($conn->{'ip'})) {
121 $ret .= "ip: ".$conn->{'ip'}."$::CRLF";
122 }
123 if (exists($conn->{'headerBuffer'})) {
124 $ret .= "headerBuffer: ".$conn->{'headerBuffer'}."$::CRLF";
125 }
126 if (exists($conn->{'messageCount'})) {
127 $ret .= "messageCount: ".$conn->{'messageCount'}."$::CRLF";
128 }
129 if (exists($conn->{'connectionStart'})) {
130 $ret .= "age: ".(time-$conn->{'connectionStart'})."$::CRLF";
131 }
132 if (exists($conn->{'connectionTimeLimit'})) {
133 $ret .= "connectionTimeLimit: ".$conn->{'connectionTimeLimit'}."$::CRLF";
134 }
135 if (exists($conn->{'useragent'})) {
136 $ret .= "useragent: ".$conn->{'useragent'}."$::CRLF";
137 }
138 if (exists($conn->{'subscriberID'})) {
139 $ret .= "subscriberID: ".$conn->{'subscriberID'}."$::CRLF";
140 }
141 return $ret;
142 }
143 }
144 return -1;
145 }
146
147 sub destroyBadRequests {
148 foreach my $conn (@Connections) {
149 if (time-$conn->{'connectionStart'} > 30 && !$conn->{'autoClose'} && !exists($conn->{'subscriberID'}) && $conn->{'type'} eq 'Meteor::Subscriber') {
150 &::syslog('debug',"Closing misbehaving subscriber %s",$conn->{'socketFN'});
151 $conn->close();
152 }
153 }
154 }
155
156 ###############################################################################
157 # Factory methods
158 ###############################################################################
159 sub new {
160 #
161 # Create a new empty instance
162 #
163 my $class=shift;
164
165 my $obj={};
166
167 bless($obj,$class);
168 }
169
170 sub newFromServer {
171 #
172 # new instance from new server connection
173 #
174 my $self=shift->new();
175
176 $::Statistics->{'total_requests'}++;
177
178 my $server=shift;
179 my $socket=$server->conSocket();
180
181 $self->{'socket'}=$socket;
182 $self->{'socketFN'}=$socket->fileno();
183 $self->{'connectionStart'}=time;
184
185 $socket->setNonBlocking();
186
187 $self->{'writeBuffer'}='';
188 $self->{'readBuffer'}='';
189 $self->{'bytesWritten'}=0;
190 $self->{'type'}=ref($self);
191 $self->{'ip'}=$socket->{'connection'}->{'remoteIP'};
192
193 push(@Connections,$self);
194
195 &::syslog('debug',"New %s for %s using file number %s",ref($self),$self->{'ip'},$self->{'socketFN'});
196
197 $self;
198 }
199
200 ###############################################################################
201 # Instance methods
202 ###############################################################################
203 sub write {
204 my $self=shift;
205
206 $self->{'writeBuffer'}.=shift;
207 $self->{'writeBufferTimestamp'}=time unless(exists($self->{'writeBufferTimestamp'}));
208 }
209
210 sub addHandleBits {
211 my $self=shift;
212
213 my $rVecRef=shift;
214 my $wVecRef=shift;
215 my $eVecRef=shift;
216
217 my $fno=$self->{'socketFN'};
218
219 if($self->{'writeBuffer'} ne '')
220 {
221 if(exists($self->{'writeBufferTimestamp'}) && $self->{'writeBufferTimestamp'}+$CONNECTION_WRITE_TIMEOUT<time)
222 {
223 &::syslog('debug',"%s for %s: write timed out",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
224
225 $self->{'writeBuffer'}='';
226 $self->close();
227 return;
228 }
229 vec($$wVecRef,$fno,1)=1;
230 }
231
232 vec($$rVecRef,$fno,1)=1;
233 vec($$eVecRef,$fno,1)=1;
234 }
235
236 sub checkHandleBits {
237 my $self=shift;
238
239 my $rVec=shift;
240 my $wVec=shift;
241 my $eVec=shift;
242
243 my $fno=$self->{'socketFN'};
244
245 if(vec($eVec,$fno,1))
246 {
247 #
248 # Something went wrong!
249 #
250 $self->exceptionReceived();
251
252 return;
253 }
254
255 if(vec($rVec,$fno,1))
256 {
257 #
258 # Data available for read
259 #
260 my $socket=$self->{'socket'};
261
262 my $buffer='';
263 my $bytesRead=sysread($socket->{'handle'},$buffer,$MAX_READ_SIZE);
264 if(defined($bytesRead) && $bytesRead>0)
265 {
266 $::Statistics->{'total_inbound_bytes'}+=$bytesRead;
267 $self->{'readBuffer'}.=$buffer;
268 while($self->{'readBuffer'}=~s/^([^\r\n]*)\r?\n//)
269 {
270 $self->processLine($1);
271 }
272 }
273 elsif(defined($bytesRead) && $bytesRead==0)
274 {
275 # Connection closed
276 $self->{'remoteClosed'}=1;
277 $self->close(1, 'remoteClosed');
278
279 return;
280 }
281 else
282 {
283 unless(${!}==EAGAIN)
284 {
285 &::syslog('notice',"Connection closed: $!");
286 $self->{'remoteClosed'}=1;
287 $self->close(1, 'remoteClosed');
288
289 return;
290 }
291 }
292 }
293
294 if(vec($wVec,$fno,1) && $self->{'writeBuffer'} ne '')
295 {
296 #
297 # Can write
298 #
299 my $socket=$self->{'socket'};
300
301 my $bytesWritten=syswrite($socket->{'handle'},$self->{'writeBuffer'});
302
303 if(defined($bytesWritten) && $bytesWritten>0)
304 {
305 $::Statistics->{'total_outbound_bytes'}+=$bytesWritten;
306 $self->{'bytesWritten'}+=$bytesWritten;
307 $self->{'writeBuffer'}=substr($self->{'writeBuffer'},$bytesWritten);
308 if(length($self->{'writeBuffer'})==0)
309 {
310 delete($self->{'writeBufferTimestamp'});
311 $self->close(1) if(exists($self->{'autoClose'}));
312 }
313 else
314 {
315 $self->{'writeBufferTimestamp'}=time;
316 }
317 }
318 else
319 {
320 unless(${!}==EAGAIN)
321 {
322 &::syslog('notice',"Connection closed: $!");
323 $self->{'remoteClosed'}=1;
324 $self->close(1, 'remoteClosed');
325
326 return;
327 }
328 }
329 }
330 }
331
332 sub exceptionReceived {
333 my $self=shift;
334
335 $self->{'writeBuffer'}='';
336
337 $self->close();
338 }
339
340 sub close {
341 my $self=shift;
342
343 #&::syslog('debug',"Close called for %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
344
345 unless($self->{'remoteClosed'})
346 {
347 if(!exists($self->{'autoClose'}) && length($self->{'writeBuffer'})>0)
348 {
349 $self->{'autoClose'}=1;
350
351 &::syslog('debug',"Will close %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
352
353 return;
354 }
355 }
356
357 eval {
358 $self->{'socket'}->close();
359 };
360
361 #
362 # Remove connection from list of connections
363 #
364 my $idx=undef;
365 my $numcon = scalar(@Connections);
366 for(my $i=0;$i<$numcon;$i++)
367 {
368 if($Connections[$i]==$self)
369 {
370 $idx=$i;
371 last;
372 }
373 }
374
375 if(defined($idx))
376 {
377 splice(@Connections,$idx,1);
378 }
379
380 &::syslog('debug',"Closed %s for %s",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
381
382 $self->didClose();
383 }
384
385 sub didClose {
386 }
387
388 1;
389 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26