/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 50 - (hide annotations)
Wed Feb 27 13:55:35 2008 UTC (16 years, 1 month ago) by andrew.betts
File size: 9288 byte(s)
Added crossdomain.xml for flash clients
Incremented version number
Moved 'new message' debug notice to more useful location
Moved default for ChannelInfoTemplate to correct position alphabetically in code
Set simpler default HeaderTemplate
Added LogTimeFormat
Updated description of PingInterval, Persist
Corrected misspelling of Parameter
Reformatted debug output for config initialisation
Added recognition of null byte in config
Fixed problem with mode recognition
Fixed resuming from given message ID
Fixed sending of message backlog
Fixed Shlemiels
Logged connection duration on leavechannel
Fixed name support in channelinfotemplate
Added logging of reasons for connection closes
Abbreviated log output
Fixed tracking of subscriber IDs
Added logging of user agent
Fixed incorrect key for MessageTemplate in Subscriber.pm
Add some additional code comments
Fixed incorrect closure of new connection if previous connection close was waiting on write buffer

1 knops.gerd 11 #!/usr/bin/perl -w
2     ###############################################################################
3     # Meteor
4     # An HTTP server for the 2.0 web
5     # Copyright (c) 2006 contributing authors
6     #
7     # Subscriber.pm
8     #
9     # Description:
10     # A Meteor Subscriber
11     #
12     ###############################################################################
13     #
14     # This program is free software; you can redistribute it and/or modify it
15     # under the terms of the GNU General Public License as published by the Free
16     # Software Foundation; either version 2 of the License, or (at your option)
17     # any later version.
18     #
19     # This program is distributed in the hope that it will be useful, but WITHOUT
20     # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21     # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22     # more details.
23     #
24     # You should have received a copy of the GNU General Public License along
25     # with this program; if not, write to the Free Software Foundation, Inc.,
26     # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27     #
28     # For more information visit www.meteorserver.org
29     #
30     ###############################################################################
31    
32     package Meteor::Subscriber;
33     ###############################################################################
34     # Configuration
35     ###############################################################################
36    
37     use strict;
38    
39     use Meteor::Connection;
40     use Meteor::Channel;
41     use Meteor::Document;
42    
43     @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44    
45     our %PersistentConnections=();
46 knops.gerd 25 our $NumAcceptedConnections=0;
47 knops.gerd 37
48 knops.gerd 11
49     ###############################################################################
50     # Factory methods
51     ###############################################################################
52     sub newFromServer {
53     my $class=shift;
54    
55     my $self=$class->SUPER::newFromServer(shift);
56    
57     $self->{'headerBuffer'}='';
58     $self->{'MessageCount'}=0;
59     $self->{'MaxMessageCount'}=0;
60    
61     $self->{'ConnectionStart'}=time;
62     my $maxTime=$::CONF{'MaxTime'};
63     if($maxTime>0)
64     {
65     $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
66     }
67    
68 knops.gerd 25 $::Statistics->{'current_subscribers'}++;
69     $::Statistics->{'subscriber_connections_accepted'}++;
70    
71 knops.gerd 11 $self;
72     }
73    
74     ###############################################################################
75     # Class methods
76     ###############################################################################
77     sub deleteSubscriberWithID {
78     my $class=shift;
79     my $id=shift;
80    
81     if(exists($PersistentConnections{$id}))
82     {
83 andrew.betts 50 $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
84 knops.gerd 11 }
85     }
86    
87     sub pingPersistentConnections {
88     my $class=shift;
89    
90     my @cons=values %PersistentConnections;
91    
92 knops.gerd 45 map { $_->ping() } @cons;
93 knops.gerd 11 }
94    
95     sub checkPersistentConnectionsForMaxTime {
96     my $class=shift;
97    
98     my $time=time;
99     my @cons=values %PersistentConnections;
100    
101     map { $_->checkForMaxTime($time) } @cons;
102     }
103    
104 knops.gerd 25 sub numSubscribers {
105    
106     return scalar(keys %PersistentConnections);
107     }
108    
109 knops.gerd 11 ###############################################################################
110     # Instance methods
111     ###############################################################################
112     sub processLine {
113     my $self=shift;
114     my $line=shift;
115    
116     # Once the header was processed we ignore any input
117     return unless(exists($self->{'headerBuffer'}));
118    
119     if($line ne '')
120     {
121     #
122     # Accumulate header
123     #
124     $self->{'headerBuffer'}.="$line\n";
125     }
126     else
127     {
128     #
129     # Empty line signals end of header.
130     # Analyze header, register with appropiate channel
131     # and send pending messages.
132     #
133 andrew.betts 32 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
134 knops.gerd 11 #
135     # Find the 'GET' line
136     #
137 andrew.betts 32 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
138 knops.gerd 11 {
139 andrew.betts 50 $self->{'subscriberID'}=$1;
140 andrew.betts 32 $self->{'mode'}=$2;
141 knops.gerd 45 my $persist=$self->getConf('Persist');
142     my $maxTime=$self->getConf('MaxTime');
143 andrew.betts 50 $self->{'ConnectionTimeLimit'} = ($self->{'ConnectionStart'}+$maxTime) if ($maxTime>0);
144 knops.gerd 45
145 andrew.betts 32 my @channelData=split('/',$3);
146 knops.gerd 13 my $channels={};
147 andrew.betts 32 my $channelName;
148     my $offset;
149 andrew.betts 35 foreach my $chandef (@channelData) {
150     if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {
151 andrew.betts 32 $channelName = $1;
152     $channels->{$channelName}->{'startIndex'} = undef;
153 andrew.betts 35 if ($3) {
154     $offset = $4;
155     if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
156     if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
157     if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
158 knops.gerd 13 }
159 knops.gerd 11 }
160     }
161 andrew.betts 50 my $useragent = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
162 knops.gerd 11
163 knops.gerd 13 delete($self->{'headerBuffer'});
164    
165 andrew.betts 50 if ($persist) {
166     $self->deleteSubscriberWithID($self->{'subscriberID'});
167     $PersistentConnections{$self->{'subscriberID'}}=$self;
168 knops.gerd 11 }
169    
170 knops.gerd 13 if(scalar(keys %{$channels}))
171 knops.gerd 11 {
172     $self->emitOKHeader();
173 andrew.betts 50 $self->setChannels($channels,$persist,$self->{'mode'},$useragent);
174     $self->close(1, 'responseComplete') unless($persist);
175 knops.gerd 11 return;
176     }
177     }
178 andrew.betts 32 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
179     {
180     $self->deleteSubscriberWithID($1);
181     $self->emitOKHeader();
182 andrew.betts 50 $self->close(1, 'disconnectRequested');
183 andrew.betts 32 return;
184     }
185 knops.gerd 11 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
186     {
187     Meteor::Document->serveFileToClient($1,$self);
188 andrew.betts 50 $self->close(1, 'responseComplete');
189 knops.gerd 11 return;
190     }
191    
192     #
193     # If we fall through we did not understand the request
194     #
195     $self->emitErrorHeader();
196     }
197     }
198    
199 knops.gerd 13 sub setChannels {
200 knops.gerd 11 my $self=shift;
201 knops.gerd 13 my $channels=shift;
202 knops.gerd 17 my $persist=shift;
203 knops.gerd 47 my $mode=shift || '';
204     my $userAgent=shift || '';
205 knops.gerd 11
206 knops.gerd 13 foreach my $channelName (keys %{$channels})
207     {
208     my $startIndex=$channels->{$channelName}->{'startIndex'};
209    
210     my $channel=Meteor::Channel->channelWithName($channelName);
211    
212     $self->{'channels'}->{$channelName}=$channel if($persist);
213    
214 knops.gerd 47 $channel->addSubscriber($self,$startIndex,$persist,$mode,$userAgent);
215 knops.gerd 13 }
216 knops.gerd 11 }
217    
218     sub emitOKHeader {
219     my $self=shift;
220    
221     $self->emitHeader('200 OK');
222     }
223    
224     sub emitErrorHeader {
225     my $self=shift;
226    
227     $self->emitHeader('404 Not Found');
228 knops.gerd 25 $::Statistics->{'errors_served'}++;
229 knops.gerd 11
230     # close up shop here!
231 andrew.betts 50 $self->close(0, 'error');
232 knops.gerd 11 }
233    
234     sub emitHeader {
235     my $self=shift;
236     my $status=shift;
237    
238 knops.gerd 49 my $header=$self->getConf('HeaderTemplate');
239 knops.gerd 11
240 knops.gerd 45 $header=~s/~([^~]*)~/
241 knops.gerd 11 if(!defined($1) || $1 eq '')
242     {
243     '~';
244     }
245     elsif($1 eq 'server')
246     {
247     $::PGM;
248     }
249     elsif($1 eq 'status')
250     {
251     $status;
252     }
253     elsif($1 eq 'servertime')
254     {
255     time;
256     }
257 knops.gerd 45 elsif($1 eq 'channelinfo')
258     {
259     Meteor::Channel->listChannelsUsingTemplate($self->getConf('ChannelInfoTemplate'));
260     }
261 knops.gerd 11 else
262     {
263     '';
264     }
265     /gex;
266    
267 andrew.betts 39 $self->write($header);
268 knops.gerd 11 }
269    
270 knops.gerd 45 sub sendMessages {
271 knops.gerd 11 my $self=shift;
272    
273 knops.gerd 45 my $numMessages=0;
274 andrew.betts 50 my $msgTemplate=$self->getConf('MessageTemplate');
275 knops.gerd 45 my $msgData='';
276 knops.gerd 25
277 knops.gerd 45 foreach my $message (@_)
278     {
279     $msgData.=$message->messageWithTemplate($msgTemplate);
280     $numMessages++;
281     }
282 knops.gerd 11
283 knops.gerd 45 return if($numMessages<1);
284 knops.gerd 25
285 knops.gerd 45 $self->write($msgData);
286 knops.gerd 11
287 knops.gerd 45 $::Statistics->{'messages_served'}+=$numMessages;
288    
289     my $msgCount=$self->{'MessageCount'};
290     $msgCount+=$numMessages;
291     $self->{'MessageCount'}=$msgCount;
292    
293     my $maxMsg=$self->getConf('MaxMessages');
294 knops.gerd 11 if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
295     {
296 andrew.betts 50 $self->close(1, 'maxMessageCountReached');
297 knops.gerd 11 }
298    
299     if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
300     {
301 andrew.betts 50 $self->close(1, 'maxMessageCountReached');
302 knops.gerd 11 }
303 knops.gerd 45
304 knops.gerd 11 }
305    
306 knops.gerd 45 sub ping {
307     my $self=shift;
308     my $msg=$self->getConf('PingMessage');
309    
310     $self->write($msg);
311     }
312    
313 knops.gerd 13 sub closeChannel {
314     my $self=shift;
315     my $channelName=shift;
316    
317     return unless(exists($self->{'channels'}->{$channelName}));
318    
319     my $channel=$self->{'channels'}->{$channelName};
320 knops.gerd 47 $channel->removeSubscriber($self,'channelClose');
321 knops.gerd 13
322     delete($self->{'channels'}->{$channelName});
323    
324 andrew.betts 50 $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
325 knops.gerd 13 }
326    
327 knops.gerd 11 sub close {
328     my $self=shift;
329     my $noShutdownMsg=shift;
330 andrew.betts 50 my $reason=shift;
331 knops.gerd 11
332 knops.gerd 13 foreach my $channelName (keys %{$self->{'channels'}})
333     {
334     my $channel=$self->{'channels'}->{$channelName};
335 andrew.betts 50 $channel->removeSubscriber($self,$reason);
336 knops.gerd 13 }
337     delete($self->{'channels'});
338 knops.gerd 11
339 andrew.betts 50 # If this connection is in the PersistentConnections array, delete it, then anonymise
340     # it so that if we have to wait for the write buffer to empty before close, it's only
341     # removed once.
342     if(exists($self->{'subscriberID'})) {
343 knops.gerd 11 delete($PersistentConnections{$self->{'subscriberID'}});
344 andrew.betts 50 delete($self->{'subscriberID'});
345 knops.gerd 11 }
346    
347     # Send shutdown message unless remote closed or
348     # connection not yet established
349     unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
350     {
351 knops.gerd 45 my $msg=$self->getConf('SubscriberShutdownMsg');
352 knops.gerd 11 if(defined($msg) && $msg ne '')
353     {
354     $self->write($msg);
355     }
356     }
357    
358     $self->SUPER::close();
359     }
360    
361 knops.gerd 37 sub didClose {
362    
363     $::Statistics->{'current_subscribers'}--;
364     }
365    
366 knops.gerd 11 sub checkForMaxTime {
367     my $self=shift;
368     my $time=shift;
369    
370 andrew.betts 50 $self->close(1,'maxTime') if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
371 knops.gerd 11 }
372    
373 knops.gerd 45 sub getConf {
374     my $self=shift;
375     my $key=shift;
376    
377     if(exists($self->{'mode'}) && $self->{'mode'} ne '')
378     {
379 andrew.betts 50 my $k=$key.$self->{'mode'};
380 knops.gerd 45
381 andrew.betts 50 if(exists($::CONF{$k})) {
382     return $::CONF{$k};
383     }
384 knops.gerd 45 }
385    
386     $::CONF{$key};
387     }
388    
389 knops.gerd 11 1;
390 andrew.betts 3 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26