/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 35 - (show annotations)
Fri Jan 25 17:12:02 2008 UTC (16 years, 2 months ago) by andrew.betts
File size: 8481 byte(s)
Terminated each message with a null byte to allow Flash clients to work with XML Sockets
Cleared a warning
Allowed uppercase channel names

1 #!/usr/bin/perl -w
2 ###############################################################################
3 # Meteor
4 # An HTTP server for the 2.0 web
5 # Copyright (c) 2006 contributing authors
6 #
7 # Subscriber.pm
8 #
9 # Description:
10 # A Meteor Subscriber
11 #
12 ###############################################################################
13 #
14 # This program is free software; you can redistribute it and/or modify it
15 # under the terms of the GNU General Public License as published by the Free
16 # Software Foundation; either version 2 of the License, or (at your option)
17 # any later version.
18 #
19 # This program is distributed in the hope that it will be useful, but WITHOUT
20 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22 # more details.
23 #
24 # You should have received a copy of the GNU General Public License along
25 # with this program; if not, write to the Free Software Foundation, Inc.,
26 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #
28 # For more information visit www.meteorserver.org
29 #
30 ###############################################################################
31
32 package Meteor::Subscriber;
33 ###############################################################################
34 # Configuration
35 ###############################################################################
36
37 use strict;
38
39 use Meteor::Connection;
40 use Meteor::Channel;
41 use Meteor::Document;
42
43 @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44
45 our %PersistentConnections=();
46 our $NumAcceptedConnections=0;
47
48 ###############################################################################
49 # Factory methods
50 ###############################################################################
51 sub newFromServer {
52 my $class=shift;
53
54 my $self=$class->SUPER::newFromServer(shift);
55
56 $self->{'headerBuffer'}='';
57 $self->{'MessageCount'}=0;
58 $self->{'MaxMessageCount'}=0;
59
60 $self->{'ConnectionStart'}=time;
61 my $maxTime=$::CONF{'MaxTime'};
62 if($maxTime>0)
63 {
64 $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
65 }
66
67 $::Statistics->{'current_subscribers'}++;
68 $::Statistics->{'subscriber_connections_accepted'}++;
69
70 $self;
71 }
72
73 ###############################################################################
74 # Class methods
75 ###############################################################################
76 sub deleteSubscriberWithID {
77 my $class=shift;
78 my $id=shift;
79
80 if(exists($PersistentConnections{$id}))
81 {
82 $PersistentConnections{$id}->close(1);
83 }
84 }
85
86 sub pingPersistentConnections {
87 my $class=shift;
88
89 my $msg=$::CONF{'PingMessage'};
90 my @cons=values %PersistentConnections;
91
92 map { $_->write($msg.chr(0)) } @cons;
93 }
94
95 sub checkPersistentConnectionsForMaxTime {
96 my $class=shift;
97
98 my $time=time;
99 my @cons=values %PersistentConnections;
100
101 map { $_->checkForMaxTime($time) } @cons;
102 }
103
104 sub numSubscribers {
105
106 return scalar(keys %PersistentConnections);
107 }
108
109 ###############################################################################
110 # Instance methods
111 ###############################################################################
112 sub processLine {
113 my $self=shift;
114 my $line=shift;
115
116 # Once the header was processed we ignore any input
117 return unless(exists($self->{'headerBuffer'}));
118
119 if($line ne '')
120 {
121 #
122 # Accumulate header
123 #
124 $self->{'headerBuffer'}.="$line\n";
125 }
126 else
127 {
128 #
129 # Empty line signals end of header.
130 # Analyze header, register with appropiate channel
131 # and send pending messages.
132 #
133 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
134 #
135 # Find the 'GET' line
136 #
137 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
138 {
139 my $subscriberID=$1;
140 my $persist=0;
141 $self->{'mode'}=$2;
142 if ($self->{'mode'} eq "xhrinteractive" || $self->{'mode'} eq "iframe" || $self->{'mode'} eq "serversent" || $self->{'mode'} eq "longpoll") {
143 $persist=1;
144 $self->{'MaxMessageCount'}=1 unless(!($self->{'mode'} eq "longpoll"));
145 }
146 if ($self->{'mode'} eq "iframe") {
147 $self->{'HeaderTemplateNumber'}=1;
148 } else {
149 $self->{'HeaderTemplateNumber'}=2;
150 }
151 my @channelData=split('/',$3);
152 my $channels={};
153 my $channelName;
154 my $offset;
155 foreach my $chandef (@channelData) {
156 if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {
157 $channelName = $1;
158 $channels->{$channelName}->{'startIndex'} = undef;
159 if ($3) {
160 $offset = $4;
161 if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
162 if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
163 if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
164 }
165 }
166 }
167
168 delete($self->{'headerBuffer'});
169
170 if($persist)
171 {
172 $self->{'subscriberID'}=$subscriberID;
173 $self->deleteSubscriberWithID($subscriberID);
174 $PersistentConnections{$subscriberID}=$self;
175 }
176
177 if(scalar(keys %{$channels}))
178 {
179 $self->emitOKHeader();
180 $self->setChannels($channels,$persist);
181 $self->close(1) unless($persist);
182 return;
183 }
184 }
185 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
186 {
187 $self->deleteSubscriberWithID($1);
188 $self->emitOKHeader();
189 $self->close(1);
190 return;
191 }
192 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
193 {
194 Meteor::Document->serveFileToClient($1,$self);
195 $self->close(1);
196 return;
197 }
198
199 #
200 # If we fall through we did not understand the request
201 #
202 $self->emitErrorHeader();
203 }
204 }
205
206 sub setChannels {
207 my $self=shift;
208 my $channels=shift;
209 my $persist=shift;
210
211 foreach my $channelName (keys %{$channels})
212 {
213 my $startIndex=$channels->{$channelName}->{'startIndex'};
214
215 my $channel=Meteor::Channel->channelWithName($channelName);
216
217 $self->{'channels'}->{$channelName}=$channel if($persist);
218
219 $channel->addSubscriber($self,$startIndex,$persist);
220 }
221 }
222
223 sub emitOKHeader {
224 my $self=shift;
225
226 $self->emitHeader('200 OK');
227 }
228
229 sub emitErrorHeader {
230 my $self=shift;
231
232 $self->emitHeader('404 Not Found');
233 $::Statistics->{'errors_served'}++;
234
235 # close up shop here!
236 $self->close();
237 }
238
239 sub emitHeader {
240 my $self=shift;
241 my $status=shift;
242
243 my $header=undef;
244 if(exists($self->{'HeaderTemplateNumber'}))
245 {
246 my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};
247
248 $header=$::CONF{$hn};
249 }
250 $header=$::CONF{'HeaderTemplate'} unless(defined($header));
251
252 $header=~s/~([^~]+)~/
253 if(!defined($1) || $1 eq '')
254 {
255 '~';
256 }
257 elsif($1 eq 'server')
258 {
259 $::PGM;
260 }
261 elsif($1 eq 'status')
262 {
263 $status;
264 }
265 elsif($1 eq 'servertime')
266 {
267 time;
268 }
269 else
270 {
271 '';
272 }
273 /gex;
274
275 $self->write($header.chr(0));
276 }
277
278 sub sendMessage {
279 my $self=shift;
280 my $msg=shift;
281 my $numMsgInThisBatch=shift;
282
283 $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch));
284
285 $self->write($msg.chr(0));
286
287 $::Statistics->{'messages_served'}+=$numMsgInThisBatch;
288
289 my $msgCount=++$self->{'MessageCount'};
290
291 my $maxMsg=$::CONF{'MaxMessages'};
292 if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
293 {
294 $self->close(1);
295 }
296
297 if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
298 {
299 $self->close(1);
300 }
301 }
302
303 sub closeChannel {
304 my $self=shift;
305 my $channelName=shift;
306
307 return unless(exists($self->{'channels'}->{$channelName}));
308
309 my $channel=$self->{'channels'}->{$channelName};
310 $channel->removeSubscriber($self);
311
312 delete($self->{'channels'}->{$channelName});
313
314 $self->close() if(scalar(keys %{$self->{'channels'}})==0);
315 }
316
317 sub close {
318 my $self=shift;
319 my $noShutdownMsg=shift;
320
321 foreach my $channelName (keys %{$self->{'channels'}})
322 {
323 my $channel=$self->{'channels'}->{$channelName};
324 $channel->removeSubscriber($self);
325 }
326 delete($self->{'channels'});
327
328 if(exists($self->{'subscriberID'}))
329 {
330 delete($PersistentConnections{$self->{'subscriberID'}});
331 }
332
333 #
334 # Send shutdown message unless remote closed or
335 # connection not yet established
336 #
337 unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
338 {
339 my $msg=$::CONF{'SubscriberShutdownMsg'};
340 if(defined($msg) && $msg ne '')
341 {
342 $self->write($msg);
343 }
344 }
345
346 $::Statistics->{'current_subscribers'}--;
347
348 $self->SUPER::close();
349 }
350
351 sub checkForMaxTime {
352 my $self=shift;
353 my $time=shift;
354
355 $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
356 }
357
358 1;
359 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26