/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 37 - (show annotations)
Fri Feb 1 21:22:03 2008 UTC (16 years, 1 month ago) by knops.gerd
File size: 8504 byte(s)
• Connection: In checkAllHandleBits (and to be sure also in addAllHandleBits) iterate over copy of @Connections, as @Connections might change during iteration due to connections being closed
• Controller, Subscriber: pending data will abort a close and call it again later, causing current_subscribers/current_controllers statistics to be incorrect. Added new 'didClose' method that is only called when a connection is actually closed, and this is where the close is counted.

1 #!/usr/bin/perl -w
2 ###############################################################################
3 # Meteor
4 # An HTTP server for the 2.0 web
5 # Copyright (c) 2006 contributing authors
6 #
7 # Subscriber.pm
8 #
9 # Description:
10 # A Meteor Subscriber
11 #
12 ###############################################################################
13 #
14 # This program is free software; you can redistribute it and/or modify it
15 # under the terms of the GNU General Public License as published by the Free
16 # Software Foundation; either version 2 of the License, or (at your option)
17 # any later version.
18 #
19 # This program is distributed in the hope that it will be useful, but WITHOUT
20 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22 # more details.
23 #
24 # You should have received a copy of the GNU General Public License along
25 # with this program; if not, write to the Free Software Foundation, Inc.,
26 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #
28 # For more information visit www.meteorserver.org
29 #
30 ###############################################################################
31
32 package Meteor::Subscriber;
33 ###############################################################################
34 # Configuration
35 ###############################################################################
36
37 use strict;
38
39 use Meteor::Connection;
40 use Meteor::Channel;
41 use Meteor::Document;
42
43 @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44
45 our %PersistentConnections=();
46 our $NumAcceptedConnections=0;
47
48
49 ###############################################################################
50 # Factory methods
51 ###############################################################################
52 sub newFromServer {
53 my $class=shift;
54
55 my $self=$class->SUPER::newFromServer(shift);
56
57 $self->{'headerBuffer'}='';
58 $self->{'MessageCount'}=0;
59 $self->{'MaxMessageCount'}=0;
60
61 $self->{'ConnectionStart'}=time;
62 my $maxTime=$::CONF{'MaxTime'};
63 if($maxTime>0)
64 {
65 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
66 }
67
68 $::Statistics->{'current_subscribers'}++;
69 $::Statistics->{'subscriber_connections_accepted'}++;
70
71 $self;
72 }
73
74 ###############################################################################
75 # Class methods
76 ###############################################################################
77 sub deleteSubscriberWithID {
78 my $class=shift;
79 my $id=shift;
80
81 if(exists($PersistentConnections{$id}))
82 {
83 $PersistentConnections{$id}->close(1);
84 }
85 }
86
87 sub pingPersistentConnections {
88 my $class=shift;
89
90 my $msg=$::CONF{'PingMessage'};
91 my @cons=values %PersistentConnections;
92
93 map { $_->write($msg.chr(0)) } @cons;
94 }
95
96 sub checkPersistentConnectionsForMaxTime {
97 my $class=shift;
98
99 my $time=time;
100 my @cons=values %PersistentConnections;
101
102 map { $_->checkForMaxTime($time) } @cons;
103 }
104
105 sub numSubscribers {
106
107 return scalar(keys %PersistentConnections);
108 }
109
110 ###############################################################################
111 # Instance methods
112 ###############################################################################
113 sub processLine {
114 my $self=shift;
115 my $line=shift;
116
117 # Once the header was processed we ignore any input
118 return unless(exists($self->{'headerBuffer'}));
119
120 if($line ne '')
121 {
122 #
123 # Accumulate header
124 #
125 $self->{'headerBuffer'}.="$line\n";
126 }
127 else
128 {
129 #
130 # Empty line signals end of header.
131 # Analyze header, register with appropiate channel
132 # and send pending messages.
133 #
134 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
135 #
136 # Find the 'GET' line
137 #
138 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
139 {
140 my $subscriberID=$1;
141 my $persist=0;
142 $self->{'mode'}=$2;
143 if ($self->{'mode'} eq "xhrinteractive" || $self->{'mode'} eq "iframe" || $self->{'mode'} eq "serversent" || $self->{'mode'} eq "longpoll") {
144 $persist=1;
145 $self->{'MaxMessageCount'}=1 unless(!($self->{'mode'} eq "longpoll"));
146 }
147 if ($self->{'mode'} eq "iframe") {
148 $self->{'HeaderTemplateNumber'}=1;
149 } else {
150 $self->{'HeaderTemplateNumber'}=2;
151 }
152 my @channelData=split('/',$3);
153 my $channels={};
154 my $channelName;
155 my $offset;
156 foreach my $chandef (@channelData) {
157 if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {
158 $channelName = $1;
159 $channels->{$channelName}->{'startIndex'} = undef;
160 if ($3) {
161 $offset = $4;
162 if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
163 if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
164 if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
165 }
166 }
167 }
168
169 delete($self->{'headerBuffer'});
170
171 if($persist)
172 {
173 $self->{'subscriberID'}=$subscriberID;
174 $self->deleteSubscriberWithID($subscriberID);
175 $PersistentConnections{$subscriberID}=$self;
176 }
177
178 if(scalar(keys %{$channels}))
179 {
180 $self->emitOKHeader();
181 $self->setChannels($channels,$persist);
182 $self->close(1) unless($persist);
183 return;
184 }
185 }
186 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
187 {
188 $self->deleteSubscriberWithID($1);
189 $self->emitOKHeader();
190 $self->close(1);
191 return;
192 }
193 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
194 {
195 Meteor::Document->serveFileToClient($1,$self);
196 $self->close(1);
197 return;
198 }
199
200 #
201 # If we fall through we did not understand the request
202 #
203 $self->emitErrorHeader();
204 }
205 }
206
207 sub setChannels {
208 my $self=shift;
209 my $channels=shift;
210 my $persist=shift;
211
212 foreach my $channelName (keys %{$channels})
213 {
214 my $startIndex=$channels->{$channelName}->{'startIndex'};
215
216 my $channel=Meteor::Channel->channelWithName($channelName);
217
218 $self->{'channels'}->{$channelName}=$channel if($persist);
219
220 $channel->addSubscriber($self,$startIndex,$persist);
221 }
222 }
223
224 sub emitOKHeader {
225 my $self=shift;
226
227 $self->emitHeader('200 OK');
228 }
229
230 sub emitErrorHeader {
231 my $self=shift;
232
233 $self->emitHeader('404 Not Found');
234 $::Statistics->{'errors_served'}++;
235
236 # close up shop here!
237 $self->close();
238 }
239
240 sub emitHeader {
241 my $self=shift;
242 my $status=shift;
243
244 my $header=undef;
245 if(exists($self->{'HeaderTemplateNumber'}))
246 {
247 my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};
248
249 $header=$::CONF{$hn};
250 }
251 $header=$::CONF{'HeaderTemplate'} unless(defined($header));
252
253 $header=~s/~([^~]+)~/
254 if(!defined($1) || $1 eq '')
255 {
256 '~';
257 }
258 elsif($1 eq 'server')
259 {
260 $::PGM;
261 }
262 elsif($1 eq 'status')
263 {
264 $status;
265 }
266 elsif($1 eq 'servertime')
267 {
268 time;
269 }
270 else
271 {
272 '';
273 }
274 /gex;
275
276 $self->write($header.chr(0));
277 }
278
279 sub sendMessage {
280 my $self=shift;
281 my $msg=shift;
282 my $numMsgInThisBatch=shift;
283
284 $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch));
285
286 $self->write($msg.chr(0));
287
288 $::Statistics->{'messages_served'}+=$numMsgInThisBatch;
289
290 my $msgCount=++$self->{'MessageCount'};
291
292 my $maxMsg=$::CONF{'MaxMessages'};
293 if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
294 {
295 $self->close(1);
296 }
297
298 if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
299 {
300 $self->close(1);
301 }
302 }
303
304 sub closeChannel {
305 my $self=shift;
306 my $channelName=shift;
307
308 return unless(exists($self->{'channels'}->{$channelName}));
309
310 my $channel=$self->{'channels'}->{$channelName};
311 $channel->removeSubscriber($self);
312
313 delete($self->{'channels'}->{$channelName});
314
315 $self->close() if(scalar(keys %{$self->{'channels'}})==0);
316 }
317
318 sub close {
319 my $self=shift;
320 my $noShutdownMsg=shift;
321
322 foreach my $channelName (keys %{$self->{'channels'}})
323 {
324 my $channel=$self->{'channels'}->{$channelName};
325 $channel->removeSubscriber($self);
326 }
327 delete($self->{'channels'});
328
329 if(exists($self->{'subscriberID'}))
330 {
331 delete($PersistentConnections{$self->{'subscriberID'}});
332 }
333
334 #
335 # Send shutdown message unless remote closed or
336 # connection not yet established
337 #
338 unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
339 {
340 my $msg=$::CONF{'SubscriberShutdownMsg'};
341 if(defined($msg) && $msg ne '')
342 {
343 $self->write($msg);
344 }
345 }
346
347 $self->SUPER::close();
348 }
349
350 sub didClose {
351
352 $::Statistics->{'current_subscribers'}--;
353 }
354
355 sub checkForMaxTime {
356 my $self=shift;
357 my $time=shift;
358
359 $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
360 }
361
362 1;
363 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26