/[meteor]/googlecode.com/svn/trunk/Meteor/Connection.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Connection.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 9 by andrew.betts, Fri Dec 8 16:52:58 2006 UTC revision 53 by andrew.betts, Wed Feb 27 21:58:56 2008 UTC
# Line 1  Line 1 
1  #!/usr/bin/perl -w  #!/usr/bin/perl -w
2  ###############################################################################  ###############################################################################
3  #   Meteor  #   Meteor
4  #   An HTTP server for the 2.0 web  #   An HTTP server for the 2.0 web
5  #   Copyright (c) 2006 contributing authors  #   Copyright (c) 2006 contributing authors
6  #  #
7  #   Subscriber.pm  #   Subscriber.pm
8  #  #
9  #       Description:  #       Description:
10  #       Common super-class for controller and subscriber  #       Common super-class for controller and subscriber
11  #  #
12  ###############################################################################  ###############################################################################
13  #  #
14  #   This program is free software; you can redistribute it and/or modify it  #   This program is free software; you can redistribute it and/or modify it
15  #   under the terms of the GNU General Public License as published by the Free  #   under the terms of the GNU General Public License as published by the Free
16  #   Software Foundation; either version 2 of the License, or (at your option)  #   Software Foundation; either version 2 of the License, or (at your option)
17  #   any later version.  #   any later version.
18  #  #
19  #   This program is distributed in the hope that it will be useful, but WITHOUT  #   This program is distributed in the hope that it will be useful, but WITHOUT
20  #   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or  #   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  #   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for  #   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22  #   more details.  #   more details.
23  #  #
24  #   You should have received a copy of the GNU General Public License along  #   You should have received a copy of the GNU General Public License along
25  #   with this program; if not, write to the Free Software Foundation, Inc.,  #   with this program; if not, write to the Free Software Foundation, Inc.,
26  #   59 Temple Place, Suite 330, Boston, MA 02111-1307 USA  #   59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27  #  #
28  #   For more information visit www.meteorserver.org  #   For more information visit www.meteorserver.org
29  #  #
30  ###############################################################################  ###############################################################################
31    
32  package Meteor::Connection;  package Meteor::Connection;
33  ###############################################################################  ###############################################################################
34  # Configuration  # Configuration
35  ###############################################################################  ###############################################################################
36                    
37          use strict;          use strict;
38                    
39          use Errno qw(EAGAIN);          use Errno qw(EAGAIN);
40                    
41          our $MAX_READ_SIZE=8192;          our $MAX_READ_SIZE=8192;
42          our $CONNECTION_WRITE_TIMEOUT=120;          our $CONNECTION_WRITE_TIMEOUT=120;
43                    
44          our @Connections=();          our @Connections=();
45    
46  ###############################################################################  ###############################################################################
47  # Class methods  # Class methods
48  ###############################################################################  ###############################################################################
49  sub addAllHandleBits {  sub addAllHandleBits {
50          my $class=shift;          my $class=shift;
51                    
52          my $rVecRef=shift;          my $rVecRef=shift;
53          my $wVecRef=shift;          my $wVecRef=shift;
54          my $eVecRef=shift;          my $eVecRef=shift;
55                    
56          map {$_->addHandleBits($rVecRef,$wVecRef,$eVecRef)} @Connections;          my @cons=@Connections;
57  }          map {$_->addHandleBits($rVecRef,$wVecRef,$eVecRef) if(defined($_)) } @cons;
58    }
59  sub checkAllHandleBits {  
60          my $class=shift;  sub checkAllHandleBits {
61                    my $class=shift;
62          my $rVec=shift;          
63          my $wVec=shift;          my $rVec=shift;
64          my $eVec=shift;          my $wVec=shift;
65                    my $eVec=shift;
66          map {$_->checkHandleBits($rVec,$wVec,$eVec)} @Connections;          
67  }          my @cons=@Connections;
68            map {$_->checkHandleBits($rVec,$wVec,$eVec) if(defined($_)) } @cons;
69  sub connectionCount {  }
70          scalar(@Connections);  
71  }  sub connectionCount {
72            scalar(@Connections);
73  sub closeAllConnections {  }
74          my @cons=@Connections;  
75            sub closeAllConnections {
76          map { $_->close(); } @cons;          my @cons=@Connections;
77  }          
78            map { $_->close(); } @cons;
79  ###############################################################################  }
80  # Factory methods  
81  ###############################################################################  ###############################################################################
82  sub new {  # Factory methods
83          #  ###############################################################################
84          # Create a new empty instance  sub new {
85          #          #
86          my $class=shift;          # Create a new empty instance
87                    #
88          my $obj={};          my $class=shift;
89                    
90          bless($obj,$class);          my $obj={};
91  }          
92            bless($obj,$class);
93  sub newFromServer {  }
94          #  
95          # new instance from new server connection  sub newFromServer {
96          #          #
97          my $self=shift->new();          # new instance from new server connection
98                    #
99          my $server=shift;          my $self=shift->new();
100          my $socket=$server->conSocket();          
101                    $::Statistics->{'total_requests'}++;
102          $self->{'socket'}=$socket;                
103          $self->{'socketFN'}=$socket->fileno();          my $server=shift;
104                    my $socket=$server->conSocket();
105          $socket->setNonBlocking();          
106                    $self->{'socket'}=$socket;      
107          $self->{'writeBuffer'}='';          $self->{'socketFN'}=$socket->fileno();
108          $self->{'readBuffer'}='';          
109                    $socket->setNonBlocking();
110          push(@Connections,$self);          
111                    $self->{'writeBuffer'}='';
112          &::syslog('debug',"New %s for %s",ref($self),$socket->{'connection'}->{'remoteIP'});          $self->{'readBuffer'}='';
113                    $self->{'bytesWritten'}=0;
114          $self;          $self->{'ip'}=$socket->{'connection'}->{'remoteIP'};
115  }          
116            push(@Connections,$self);
117  ###############################################################################          
118  # Instance methods          &::syslog('debug',"New %s for %s",ref($self),$socket->{'connection'}->{'remoteIP'});
119  ###############################################################################          
120  sub write {          $self;
121          my $self=shift;  }
122            
123          $self->{'writeBuffer'}.=shift;  ###############################################################################
124          $self->{'writeBufferTimestamp'}=time unless(exists($self->{'writeBufferTimestamp'}));  # Instance methods
125  }  ###############################################################################
126    sub write {
127  sub addHandleBits {          my $self=shift;
128          my $self=shift;          
129                    $self->{'writeBuffer'}.=shift;
130          my $rVecRef=shift;          $self->{'writeBufferTimestamp'}=time unless(exists($self->{'writeBufferTimestamp'}));
131          my $wVecRef=shift;  }
132          my $eVecRef=shift;  
133            sub addHandleBits {
134          my $fno=$self->{'socketFN'};          my $self=shift;
135                    
136          if($self->{'writeBuffer'} ne '')          my $rVecRef=shift;
137          {          my $wVecRef=shift;
138                  if(exists($self->{'writeBufferTimestamp'}) && $self->{'writeBufferTimestamp'}+$CONNECTION_WRITE_TIMEOUT<time)          my $eVecRef=shift;
139                  {          
140                          &::syslog('debug',"%s for %s: write timed out",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});          my $fno=$self->{'socketFN'};
141                                    
142                          $self->{'writeBuffer'}='';          if($self->{'writeBuffer'} ne '')
143                          $self->close();          {
144                          return;                  if(exists($self->{'writeBufferTimestamp'}) && $self->{'writeBufferTimestamp'}+$CONNECTION_WRITE_TIMEOUT<time)
145                  }                  {
146                  vec($$wVecRef,$fno,1)=1;                          &::syslog('debug',"%s for %s: write timed out",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
147          }                          
148                            $self->{'writeBuffer'}='';
149          vec($$rVecRef,$fno,1)=1;                          $self->close();
150          vec($$eVecRef,$fno,1)=1;                          return;
151  }                  }
152                    vec($$wVecRef,$fno,1)=1;
153  sub checkHandleBits {          }
154          my $self=shift;  
155                    vec($$rVecRef,$fno,1)=1;
156          my $rVec=shift;          vec($$eVecRef,$fno,1)=1;
157          my $wVec=shift;  }
158          my $eVec=shift;  
159            sub checkHandleBits {
160          my $fno=$self->{'socketFN'};          my $self=shift;
161                    
162          if(vec($eVec,$fno,1))          my $rVec=shift;
163          {          my $wVec=shift;
164                  #          my $eVec=shift;
165                  # Something went wrong!          
166                  #          my $fno=$self->{'socketFN'};
167                  $self->exceptionReceived();          
168                            if(vec($eVec,$fno,1))
169                  return;          {
170          }                  #
171                            # Something went wrong!
172          if(vec($rVec,$fno,1))                  #
173          {                  $self->exceptionReceived();
174                  #                  
175                  # Data available for read                  return;
176                  #          }
177                  my $socket=$self->{'socket'};          
178                            if(vec($rVec,$fno,1))
179                  my $buffer='';          {
180                  my $bytesRead=sysread($socket->{'handle'},$buffer,$MAX_READ_SIZE);                  #
181                  if(defined($bytesRead) && $bytesRead>0)                  # Data available for read
182                  {                  #
183                          $self->{'readBuffer'}.=$buffer;                  my $socket=$self->{'socket'};
184                          while($self->{'readBuffer'}=~s/^([^\r\n]*)\r?\n//)                  
185                          {                  my $buffer='';
186                                  $self->processLine($1);                  my $bytesRead=sysread($socket->{'handle'},$buffer,$MAX_READ_SIZE);
187                          }                  if(defined($bytesRead) && $bytesRead>0)
188                  }                  {
189                  elsif(defined($bytesRead) && $bytesRead==0)                          $::Statistics->{'total_inbound_bytes'}+=$bytesRead;
190                  {                          $self->{'readBuffer'}.=$buffer;
191                          # Connection closed                          while($self->{'readBuffer'}=~s/^([^\r\n]*)\r?\n//)
192                          $self->{'remoteClosed'}=1;                          {
193                          $self->close();                                  $self->processLine($1);
194                                                    }
195                          return;                  }
196                  }                  elsif(defined($bytesRead) && $bytesRead==0)
197                  else                  {
198                  {                          # Connection closed
199                          unless(${!}==EAGAIN)                          $self->{'remoteClosed'}=1;
200                          {                          $self->close(1, 'remoteClosed');
201                                  &::syslog('notice',"Connection closed: $!");                          
202                                  $self->{'remoteClosed'}=1;                          return;
203                                  $self->close();                  }
204                                                    else
205                                  return;                  {
206                          }                          unless(${!}==EAGAIN)
207                  }                          {
208          }                                  &::syslog('notice',"Connection closed: $!");
209                                            $self->{'remoteClosed'}=1;
210          if(vec($wVec,$fno,1) && $self->{'writeBuffer'} ne '')                                  $self->close(1, 'remoteClosed');
211          {                                  
212                  #                                  return;
213                  # Can write                          }
214                  #                  }
215                  my $socket=$self->{'socket'};          }
216                            
217                  my $bytesWritten=syswrite($socket->{'handle'},$self->{'writeBuffer'});          if(vec($wVec,$fno,1) && $self->{'writeBuffer'} ne '')
218                            {
219                  if(defined($bytesWritten) && $bytesWritten>0)                  #
220                  {                  # Can write
221                          $self->{'writeBuffer'}=substr($self->{'writeBuffer'},$bytesWritten);                  #
222                          if(length($self->{'writeBuffer'})==0)                  my $socket=$self->{'socket'};
223                          {                  
224                                  delete($self->{'writeBufferTimestamp'});                  my $bytesWritten=syswrite($socket->{'handle'},$self->{'writeBuffer'});
225                                  $self->close() if(exists($self->{'autoClose'}));                  
226                          }                  if(defined($bytesWritten) && $bytesWritten>0)
227                          else                  {
228                          {                          $::Statistics->{'total_outbound_bytes'}+=$bytesWritten;
229                                  $self->{'writeBufferTimestamp'}=time;                          $self->{'bytesWritten'}+=$bytesWritten;
230                          }                          $self->{'writeBuffer'}=substr($self->{'writeBuffer'},$bytesWritten);
231                  }                          if(length($self->{'writeBuffer'})==0)
232                  else                          {
233                  {                                  delete($self->{'writeBufferTimestamp'});
234                          unless(${!}==EAGAIN)                                  $self->close(1) if(exists($self->{'autoClose'}));
235                          {                          }
236                                  &::syslog('notice',"Connection closed: $!");                          else
237                                  $self->{'remoteClosed'}=1;                          {
238                                  $self->close();                                  $self->{'writeBufferTimestamp'}=time;
239                                                            }
240                                  return;                  }
241                          }                  else
242                  }                  {
243          }                          unless(${!}==EAGAIN)
244  }                          {
245                                    &::syslog('notice',"Connection closed: $!");
246  sub exceptionReceived {                                  $self->{'remoteClosed'}=1;
247          my $self=shift;                                  $self->close(1, 'remoteClosed');
248                                            
249          $self->{'writeBuffer'}='';                                  return;
250                                    }
251          $self->close();                  }
252  }          }
253    }
254  sub close {  
255          my $self=shift;  sub exceptionReceived {
256                    my $self=shift;
257          #&::syslog('debug',"Close called for %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});          
258                    $self->{'writeBuffer'}='';
259          unless($self->{'remoteClosed'})          
260          {          $self->close();
261                  if(!exists($self->{'autoClose'}) && length($self->{'writeBuffer'})>0)  }
262                  {  
263                          $self->{'autoClose'}=1;  sub close {
264                            my $self=shift;
265                          &::syslog('debug',"Will close %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});          
266                            #&::syslog('debug',"Close called for %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
267                          return;          
268                  }          unless($self->{'remoteClosed'})
269          }          {
270                            if(!exists($self->{'autoClose'}) && length($self->{'writeBuffer'})>0)
271          eval {                  {
272                  $self->{'socket'}->close();                          $self->{'autoClose'}=1;
273          };                  
274                                    &::syslog('debug',"Will close %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
275          #                  
276          # Remove connection from list of connections                          return;
277          #                  }
278          my $idx=undef;          }
279          for(my $i=0;$i<scalar(@Connections);$i++)          
280          {          eval {
281                  if($Connections[$i]==$self)                  $self->{'socket'}->close();
282                  {          };
283                          $idx=$i;          
284                          last;          #
285                  }          # Remove connection from list of connections
286          }          #
287                    my $idx=undef;
288          if(defined($idx))          my $numcon = scalar(@Connections);
289          {          for(my $i=0;$i<$numcon;$i++)
290                  splice(@Connections,$idx,1);          {
291          }                  if($Connections[$i]==$self)
292                            {
293          &::syslog('debug',"Closed %s for %s",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});                          $idx=$i;
294  }                          last;
295                    }
296  1;          }
297            
298            if(defined($idx))
299            {
300                    splice(@Connections,$idx,1);
301            }
302            
303            &::syslog('debug',"Closed %s for %s",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
304            
305            $self->didClose();
306    }
307    
308    sub didClose {
309    }
310    
311    1;
312  ############################################################################EOF  ############################################################################EOF

Legend:
Removed from v.9  
changed lines
  Added in v.53

  ViewVC Help
Powered by ViewVC 1.1.26