/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 64 - (hide annotations)
Mon Jan 19 11:19:41 2009 UTC (15 years, 2 months ago) by andrew.betts
File size: 10308 byte(s)
Release 1.06.04 as documented in Google Group

1 knops.gerd 11 #!/usr/bin/perl -w
2     ###############################################################################
3     # Meteor
4     # An HTTP server for the 2.0 web
5     # Copyright (c) 2006 contributing authors
6     #
7     # Subscriber.pm
8     #
9     # Description:
10     # A Meteor Subscriber
11     #
12     ###############################################################################
13     #
14     # This program is free software; you can redistribute it and/or modify it
15     # under the terms of the GNU General Public License as published by the Free
16     # Software Foundation; either version 2 of the License, or (at your option)
17     # any later version.
18     #
19     # This program is distributed in the hope that it will be useful, but WITHOUT
20     # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21     # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22     # more details.
23     #
24     # You should have received a copy of the GNU General Public License along
25     # with this program; if not, write to the Free Software Foundation, Inc.,
26     # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27     #
28     # For more information visit www.meteorserver.org
29     #
30     ###############################################################################
31    
32     package Meteor::Subscriber;
33     ###############################################################################
34     # Configuration
35     ###############################################################################
36    
37     use strict;
38    
39     use Meteor::Connection;
40     use Meteor::Channel;
41     use Meteor::Document;
42    
43     @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44    
45     our %PersistentConnections=();
46 knops.gerd 25 our $NumAcceptedConnections=0;
47 knops.gerd 37
48 knops.gerd 11
49     ###############################################################################
50     # Factory methods
51     ###############################################################################
52     sub newFromServer {
53     my $class=shift;
54    
55     my $self=$class->SUPER::newFromServer(shift);
56    
57     $self->{'headerBuffer'}='';
58 andrew.betts 62 $self->{'messageCount'}=0;
59 knops.gerd 11
60 knops.gerd 25 $::Statistics->{'current_subscribers'}++;
61     $::Statistics->{'subscriber_connections_accepted'}++;
62    
63 knops.gerd 11 $self;
64     }
65    
66     ###############################################################################
67     # Class methods
68     ###############################################################################
69     sub deleteSubscriberWithID {
70     my $class=shift;
71     my $id=shift;
72    
73     if(exists($PersistentConnections{$id}))
74     {
75 andrew.betts 50 $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
76 knops.gerd 11 }
77     }
78    
79 andrew.betts 62 sub subscriberExists {
80     my $class=shift;
81     my $id=shift;
82    
83     return exists($PersistentConnections{$id});
84     }
85    
86 knops.gerd 11 sub pingPersistentConnections {
87     my $class=shift;
88    
89     my @cons=values %PersistentConnections;
90    
91 knops.gerd 45 map { $_->ping() } @cons;
92 knops.gerd 11 }
93    
94     sub checkPersistentConnectionsForMaxTime {
95     my $class=shift;
96    
97     my $time=time;
98     my @cons=values %PersistentConnections;
99    
100     map { $_->checkForMaxTime($time) } @cons;
101     }
102    
103 knops.gerd 25 sub numSubscribers {
104    
105     return scalar(keys %PersistentConnections);
106     }
107    
108 andrew.betts 62 sub listSubscribers {
109     my $class=shift;
110     my $list='';
111     foreach my $subscriber (keys %PersistentConnections)
112     {
113     my $sub = $PersistentConnections{$subscriber};
114     $list .= $subscriber.' '.$sub->{'ip'}.' '.$sub->{'connectionStart'}.' '.$sub->{'connectionTimeLimit'}.' '.($sub->{'connectionTimeLimit'}-time).' '.$sub->{'messageCount'}.' "'.$sub->{'useragent'}."\"$::CRLF";
115     }
116     $list;
117     }
118    
119 knops.gerd 11 ###############################################################################
120     # Instance methods
121     ###############################################################################
122     sub processLine {
123     my $self=shift;
124     my $line=shift;
125    
126 andrew.betts 62 # Once the header was processed we ignore any input - Meteor does not accept or process request bodies
127 knops.gerd 11 return unless(exists($self->{'headerBuffer'}));
128    
129     if($line ne '')
130     {
131     #
132     # Accumulate header
133     #
134     $self->{'headerBuffer'}.="$line\n";
135     }
136     else
137     {
138     #
139     # Empty line signals end of header.
140     # Analyze header, register with appropiate channel
141     # and send pending messages.
142     #
143 andrew.betts 32 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
144 knops.gerd 11 #
145     # Find the 'GET' line
146     #
147 andrew.betts 56 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i)
148 knops.gerd 11 {
149 andrew.betts 50 $self->{'subscriberID'}=$1;
150 andrew.betts 32 $self->{'mode'}=$2;
151 knops.gerd 45 my $persist=$self->getConf('Persist');
152     my $maxTime=$self->getConf('MaxTime');
153 andrew.betts 62 $self->{'connectionTimeLimit'} = ($self->{'connectionStart'}+$maxTime) if ($maxTime>0);
154 knops.gerd 45
155 andrew.betts 32 my @channelData=split('/',$3);
156 knops.gerd 13 my $channels={};
157 andrew.betts 32 my $channelName;
158     my $offset;
159 andrew.betts 35 foreach my $chandef (@channelData) {
160 andrew.betts 51 if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
161 andrew.betts 32 $channelName = $1;
162     $channels->{$channelName}->{'startIndex'} = undef;
163 andrew.betts 35 if ($3) {
164     $offset = $4;
165     if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
166     if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
167     if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
168 knops.gerd 13 }
169 knops.gerd 11 }
170     }
171 andrew.betts 62 $self->{'useragent'} = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
172 knops.gerd 11
173 andrew.betts 50 if ($persist) {
174 andrew.betts 62 # New persistent connection: kill any existing connection with same ID
175 andrew.betts 50 $self->deleteSubscriberWithID($self->{'subscriberID'});
176 andrew.betts 62 # Add new persistent connection to collection
177 andrew.betts 50 $PersistentConnections{$self->{'subscriberID'}}=$self;
178 andrew.betts 62 } else {
179     $::Pollers->{$self->{'subscriberID'}} = time;
180 knops.gerd 11 }
181    
182 andrew.betts 53 if(scalar(keys %{$channels})) {
183    
184     $self->{'channelinfo'} = '';
185     my $citemplate = $self->getConf('ChannelInfoTemplate');
186     foreach $channelName (keys %{$channels}) {
187     my $channel=Meteor::Channel->channelWithName($channelName);
188     $self->{'channels'}->{$channelName}=$channel;
189 andrew.betts 62 if (defined($self->{'channels'}->{$channelName}->{'startIndex'}) && $self->{'channels'}->{$channelName}->{'startIndex'} > 0) {
190     $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
191     }
192 andrew.betts 53 }
193 knops.gerd 11 $self->emitOKHeader();
194 andrew.betts 53 foreach $channelName (keys %{$channels}) {
195     my $startIndex=$channels->{$channelName}->{'startIndex'};
196 andrew.betts 62 $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$self->{'useragent'});
197 andrew.betts 53 }
198 andrew.betts 64 if (!$persist) {
199     delete ($self->{'channels'});
200     $self->close(1, 'responseComplete');
201     }
202 andrew.betts 62 delete($self->{'headerBuffer'});
203 andrew.betts 64
204     # If long polling, close connection immediately if any messages have been sent
205     if ($self->{'messageCount'} > 0 && $self->{'mode'} eq 'longpoll') {
206     $self->close(1, 'closedOnEvent');
207     }
208 knops.gerd 11 return;
209     }
210     }
211 andrew.betts 32 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
212     {
213     $self->deleteSubscriberWithID($1);
214     $self->emitOKHeader();
215 andrew.betts 50 $self->close(1, 'disconnectRequested');
216 andrew.betts 32 return;
217     }
218 knops.gerd 11 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
219     {
220     Meteor::Document->serveFileToClient($1,$self);
221 andrew.betts 64 $self->SUPER::close();
222 knops.gerd 11 return;
223     }
224    
225     #
226     # If we fall through we did not understand the request
227     #
228     $self->emitErrorHeader();
229     }
230     }
231    
232     sub emitOKHeader {
233     my $self=shift;
234    
235     $self->emitHeader('200 OK');
236     }
237    
238     sub emitErrorHeader {
239     my $self=shift;
240    
241     $self->emitHeader('404 Not Found');
242 knops.gerd 25 $::Statistics->{'errors_served'}++;
243 knops.gerd 11
244     # close up shop here!
245 andrew.betts 50 $self->close(0, 'error');
246 knops.gerd 11 }
247    
248     sub emitHeader {
249     my $self=shift;
250     my $status=shift;
251    
252 knops.gerd 49 my $header=$self->getConf('HeaderTemplate');
253 knops.gerd 11
254 knops.gerd 45 $header=~s/~([^~]*)~/
255 andrew.betts 53 if(!defined($1) || $1 eq '') {
256 knops.gerd 11 '~';
257 andrew.betts 53 } elsif($1 eq 'server') {
258 knops.gerd 11 $::PGM;
259 andrew.betts 53 } elsif($1 eq 'status') {
260 knops.gerd 11 $status;
261 andrew.betts 53 } elsif($1 eq 'servertime') {
262 knops.gerd 11 time;
263 andrew.betts 53 } elsif($1 eq 'channelinfo') {
264     $self->{'channelinfo'};
265     } else {
266 knops.gerd 11 '';
267     }
268     /gex;
269    
270 andrew.betts 39 $self->write($header);
271 knops.gerd 11 }
272    
273 knops.gerd 45 sub sendMessages {
274 knops.gerd 11 my $self=shift;
275    
276 knops.gerd 45 my $numMessages=0;
277 andrew.betts 50 my $msgTemplate=$self->getConf('MessageTemplate');
278 knops.gerd 45 my $msgData='';
279 knops.gerd 25
280 knops.gerd 45 foreach my $message (@_)
281     {
282     $msgData.=$message->messageWithTemplate($msgTemplate);
283     $numMessages++;
284     }
285 knops.gerd 11
286 knops.gerd 45 return if($numMessages<1);
287 knops.gerd 25
288 knops.gerd 45 $self->write($msgData);
289 knops.gerd 11
290 knops.gerd 45 $::Statistics->{'messages_served'}+=$numMessages;
291    
292 andrew.betts 62 my $msgCount=$self->{'messageCount'};
293 knops.gerd 45 $msgCount+=$numMessages;
294 andrew.betts 62 $self->{'messageCount'}=$msgCount;
295 knops.gerd 45
296 andrew.betts 62 # If long polling, close connection, as a message has now been sent.
297     # Don't close if still processing the header (we may be sending a backlog from multiple channels)
298     if($self->getConf('CloseOnEvent') && !exists($self->{'headerBuffer'})) {
299     $self->close(1, 'closedOnEvent');
300 knops.gerd 11 }
301     }
302    
303 knops.gerd 45 sub ping {
304     my $self=shift;
305     my $msg=$self->getConf('PingMessage');
306    
307     $self->write($msg);
308     }
309    
310 knops.gerd 13 sub closeChannel {
311     my $self=shift;
312     my $channelName=shift;
313    
314     return unless(exists($self->{'channels'}->{$channelName}));
315    
316     my $channel=$self->{'channels'}->{$channelName};
317 knops.gerd 47 $channel->removeSubscriber($self,'channelClose');
318 knops.gerd 13
319     delete($self->{'channels'}->{$channelName});
320    
321 andrew.betts 50 $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
322 knops.gerd 13 }
323    
324 knops.gerd 11 sub close {
325     my $self=shift;
326     my $noShutdownMsg=shift;
327 andrew.betts 50 my $reason=shift;
328 knops.gerd 11
329 knops.gerd 13 foreach my $channelName (keys %{$self->{'channels'}})
330     {
331     my $channel=$self->{'channels'}->{$channelName};
332 andrew.betts 50 $channel->removeSubscriber($self,$reason);
333 knops.gerd 13 }
334     delete($self->{'channels'});
335 knops.gerd 11
336 andrew.betts 50 # If this connection is in the PersistentConnections array, delete it, then anonymise
337     # it so that if we have to wait for the write buffer to empty before close, it's only
338     # removed once.
339     if(exists($self->{'subscriberID'})) {
340 knops.gerd 11 delete($PersistentConnections{$self->{'subscriberID'}});
341 andrew.betts 50 delete($self->{'subscriberID'});
342 knops.gerd 11 }
343    
344     # Send shutdown message unless remote closed or
345     # connection not yet established
346     unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
347     {
348 knops.gerd 45 my $msg=$self->getConf('SubscriberShutdownMsg');
349 knops.gerd 11 if(defined($msg) && $msg ne '')
350     {
351     $self->write($msg);
352     }
353     }
354 andrew.betts 62
355     my $fmsg=$self->getConf('FooterTemplate');
356     if(defined($fmsg) && $fmsg ne '')
357     {
358     $self->write($fmsg);
359     }
360    
361 knops.gerd 11 $self->SUPER::close();
362     }
363    
364 knops.gerd 37 sub didClose {
365    
366     $::Statistics->{'current_subscribers'}--;
367     }
368    
369 knops.gerd 11 sub checkForMaxTime {
370     my $self=shift;
371     my $time=shift;
372    
373 andrew.betts 62 $self->close(1,'maxTime') if(exists($self->{'connectionTimeLimit'}) && $self->{'connectionTimeLimit'}<$time);
374 knops.gerd 11 }
375    
376 knops.gerd 45 sub getConf {
377     my $self=shift;
378     my $key=shift;
379    
380 andrew.betts 64 if(exists($self->{'mode'}) && $self->{'mode'} ne '') {
381     my $k=$key.$self->{'mode'};
382 andrew.betts 50 if(exists($::CONF{$k})) {
383     return $::CONF{$k};
384     }
385 knops.gerd 45 }
386    
387     $::CONF{$key};
388     }
389    
390 knops.gerd 11 1;
391 andrew.betts 3 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26