/[meteor]/googlecode.com/svn/trunk/Meteor/Connection.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /googlecode.com/svn/trunk/Meteor/Connection.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 53 - (hide annotations)
Wed Feb 27 21:58:56 2008 UTC (13 years, 10 months ago) by andrew.betts
File size: 6883 byte(s)
Updated version number

JS client:
Added channel info handler to JS client, assumes Meteor will send <script>ch('channel', msgid);</script>
Allowed processing of messages prior to current message index
Added disconnect() to eof()
Revert to poll mode if unable to load frame (should fix IE proxy issues)

Server:
Fixed output of channel info to show only subscribed channels (and simplified)
Added logging of IP addresses

1 knops.gerd 11 #!/usr/bin/perl -w
2     ###############################################################################
3     # Meteor
4     # An HTTP server for the 2.0 web
5     # Copyright (c) 2006 contributing authors
6     #
7     # Subscriber.pm
8     #
9     # Description:
10     # Common super-class for controller and subscriber
11     #
12     ###############################################################################
13     #
14     # This program is free software; you can redistribute it and/or modify it
15     # under the terms of the GNU General Public License as published by the Free
16     # Software Foundation; either version 2 of the License, or (at your option)
17     # any later version.
18     #
19     # This program is distributed in the hope that it will be useful, but WITHOUT
20     # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21     # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22     # more details.
23     #
24     # You should have received a copy of the GNU General Public License along
25     # with this program; if not, write to the Free Software Foundation, Inc.,
26     # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27     #
28     # For more information visit www.meteorserver.org
29     #
30     ###############################################################################
31    
32     package Meteor::Connection;
33     ###############################################################################
34     # Configuration
35     ###############################################################################
36    
37     use strict;
38    
39     use Errno qw(EAGAIN);
40    
41     our $MAX_READ_SIZE=8192;
42     our $CONNECTION_WRITE_TIMEOUT=120;
43    
44     our @Connections=();
45    
46     ###############################################################################
47     # Class methods
48     ###############################################################################
49     sub addAllHandleBits {
50     my $class=shift;
51    
52     my $rVecRef=shift;
53     my $wVecRef=shift;
54     my $eVecRef=shift;
55    
56 knops.gerd 37 my @cons=@Connections;
57     map {$_->addHandleBits($rVecRef,$wVecRef,$eVecRef) if(defined($_)) } @cons;
58 knops.gerd 11 }
59    
60     sub checkAllHandleBits {
61     my $class=shift;
62    
63     my $rVec=shift;
64     my $wVec=shift;
65     my $eVec=shift;
66    
67 knops.gerd 37 my @cons=@Connections;
68     map {$_->checkHandleBits($rVec,$wVec,$eVec) if(defined($_)) } @cons;
69 knops.gerd 11 }
70    
71     sub connectionCount {
72     scalar(@Connections);
73     }
74    
75     sub closeAllConnections {
76     my @cons=@Connections;
77    
78     map { $_->close(); } @cons;
79     }
80    
81     ###############################################################################
82     # Factory methods
83     ###############################################################################
84     sub new {
85     #
86     # Create a new empty instance
87     #
88     my $class=shift;
89    
90     my $obj={};
91    
92     bless($obj,$class);
93     }
94    
95     sub newFromServer {
96     #
97     # new instance from new server connection
98     #
99     my $self=shift->new();
100    
101 knops.gerd 25 $::Statistics->{'total_requests'}++;
102    
103 knops.gerd 11 my $server=shift;
104     my $socket=$server->conSocket();
105    
106     $self->{'socket'}=$socket;
107     $self->{'socketFN'}=$socket->fileno();
108    
109     $socket->setNonBlocking();
110    
111     $self->{'writeBuffer'}='';
112     $self->{'readBuffer'}='';
113 knops.gerd 47 $self->{'bytesWritten'}=0;
114 andrew.betts 53 $self->{'ip'}=$socket->{'connection'}->{'remoteIP'};
115 knops.gerd 47
116 knops.gerd 11 push(@Connections,$self);
117    
118     &::syslog('debug',"New %s for %s",ref($self),$socket->{'connection'}->{'remoteIP'});
119    
120     $self;
121     }
122    
123     ###############################################################################
124     # Instance methods
125     ###############################################################################
126     sub write {
127     my $self=shift;
128    
129     $self->{'writeBuffer'}.=shift;
130     $self->{'writeBufferTimestamp'}=time unless(exists($self->{'writeBufferTimestamp'}));
131     }
132    
133     sub addHandleBits {
134     my $self=shift;
135    
136     my $rVecRef=shift;
137     my $wVecRef=shift;
138     my $eVecRef=shift;
139    
140     my $fno=$self->{'socketFN'};
141    
142     if($self->{'writeBuffer'} ne '')
143     {
144     if(exists($self->{'writeBufferTimestamp'}) && $self->{'writeBufferTimestamp'}+$CONNECTION_WRITE_TIMEOUT<time)
145     {
146     &::syslog('debug',"%s for %s: write timed out",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
147    
148     $self->{'writeBuffer'}='';
149     $self->close();
150     return;
151     }
152     vec($$wVecRef,$fno,1)=1;
153     }
154    
155     vec($$rVecRef,$fno,1)=1;
156     vec($$eVecRef,$fno,1)=1;
157     }
158    
159     sub checkHandleBits {
160     my $self=shift;
161    
162     my $rVec=shift;
163     my $wVec=shift;
164     my $eVec=shift;
165    
166     my $fno=$self->{'socketFN'};
167    
168     if(vec($eVec,$fno,1))
169     {
170     #
171     # Something went wrong!
172     #
173     $self->exceptionReceived();
174    
175     return;
176     }
177    
178     if(vec($rVec,$fno,1))
179     {
180     #
181     # Data available for read
182     #
183     my $socket=$self->{'socket'};
184    
185     my $buffer='';
186     my $bytesRead=sysread($socket->{'handle'},$buffer,$MAX_READ_SIZE);
187     if(defined($bytesRead) && $bytesRead>0)
188     {
189 knops.gerd 25 $::Statistics->{'total_inbound_bytes'}+=$bytesRead;
190 knops.gerd 11 $self->{'readBuffer'}.=$buffer;
191     while($self->{'readBuffer'}=~s/^([^\r\n]*)\r?\n//)
192     {
193     $self->processLine($1);
194     }
195     }
196     elsif(defined($bytesRead) && $bytesRead==0)
197     {
198     # Connection closed
199     $self->{'remoteClosed'}=1;
200 andrew.betts 50 $self->close(1, 'remoteClosed');
201 knops.gerd 11
202     return;
203     }
204     else
205     {
206     unless(${!}==EAGAIN)
207     {
208     &::syslog('notice',"Connection closed: $!");
209     $self->{'remoteClosed'}=1;
210 andrew.betts 50 $self->close(1, 'remoteClosed');
211 knops.gerd 11
212     return;
213     }
214     }
215     }
216    
217     if(vec($wVec,$fno,1) && $self->{'writeBuffer'} ne '')
218     {
219     #
220     # Can write
221     #
222     my $socket=$self->{'socket'};
223    
224     my $bytesWritten=syswrite($socket->{'handle'},$self->{'writeBuffer'});
225    
226     if(defined($bytesWritten) && $bytesWritten>0)
227     {
228 knops.gerd 25 $::Statistics->{'total_outbound_bytes'}+=$bytesWritten;
229 knops.gerd 47 $self->{'bytesWritten'}+=$bytesWritten;
230 knops.gerd 11 $self->{'writeBuffer'}=substr($self->{'writeBuffer'},$bytesWritten);
231     if(length($self->{'writeBuffer'})==0)
232     {
233     delete($self->{'writeBufferTimestamp'});
234 andrew.betts 53 $self->close(1) if(exists($self->{'autoClose'}));
235 knops.gerd 11 }
236     else
237     {
238     $self->{'writeBufferTimestamp'}=time;
239     }
240     }
241     else
242     {
243     unless(${!}==EAGAIN)
244     {
245     &::syslog('notice',"Connection closed: $!");
246     $self->{'remoteClosed'}=1;
247 andrew.betts 50 $self->close(1, 'remoteClosed');
248 knops.gerd 11
249     return;
250     }
251     }
252     }
253     }
254    
255     sub exceptionReceived {
256     my $self=shift;
257    
258     $self->{'writeBuffer'}='';
259    
260     $self->close();
261     }
262    
263     sub close {
264     my $self=shift;
265    
266     #&::syslog('debug',"Close called for %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
267    
268     unless($self->{'remoteClosed'})
269     {
270     if(!exists($self->{'autoClose'}) && length($self->{'writeBuffer'})>0)
271     {
272     $self->{'autoClose'}=1;
273    
274     &::syslog('debug',"Will close %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
275    
276     return;
277     }
278     }
279    
280     eval {
281     $self->{'socket'}->close();
282     };
283    
284     #
285     # Remove connection from list of connections
286     #
287     my $idx=undef;
288 andrew.betts 50 my $numcon = scalar(@Connections);
289     for(my $i=0;$i<$numcon;$i++)
290 knops.gerd 11 {
291     if($Connections[$i]==$self)
292     {
293     $idx=$i;
294     last;
295     }
296     }
297    
298     if(defined($idx))
299     {
300     splice(@Connections,$idx,1);
301     }
302    
303     &::syslog('debug',"Closed %s for %s",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
304 knops.gerd 37
305     $self->didClose();
306 knops.gerd 11 }
307    
308 knops.gerd 37 sub didClose {
309     }
310    
311 knops.gerd 11 1;
312 andrew.betts 3 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26