/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 47 - (hide annotations)
Mon Feb 4 21:06:42 2008 UTC (16 years, 1 month ago) by knops.gerd
File size: 9402 byte(s)
• syslog change: If `SyslogFacility` is set to `none`, meteord will not put itself into the background and print all syslog messages with a priority higher than `debug` to standard output. The output will be prefixed with a timestamp (unix time in seconds) and a tab character.

• New syslog messges: joinchannel, leavechannel, document


1 knops.gerd 11 #!/usr/bin/perl -w
2     ###############################################################################
3     # Meteor
4     # An HTTP server for the 2.0 web
5     # Copyright (c) 2006 contributing authors
6     #
7     # Subscriber.pm
8     #
9     # Description:
10     # A Meteor Subscriber
11     #
12     ###############################################################################
13     #
14     # This program is free software; you can redistribute it and/or modify it
15     # under the terms of the GNU General Public License as published by the Free
16     # Software Foundation; either version 2 of the License, or (at your option)
17     # any later version.
18     #
19     # This program is distributed in the hope that it will be useful, but WITHOUT
20     # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21     # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22     # more details.
23     #
24     # You should have received a copy of the GNU General Public License along
25     # with this program; if not, write to the Free Software Foundation, Inc.,
26     # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27     #
28     # For more information visit www.meteorserver.org
29     #
30     ###############################################################################
31    
32     package Meteor::Subscriber;
33     ###############################################################################
34     # Configuration
35     ###############################################################################
36    
37     use strict;
38    
39     use Meteor::Connection;
40     use Meteor::Channel;
41     use Meteor::Document;
42    
43     @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44    
45     our %PersistentConnections=();
46 knops.gerd 25 our $NumAcceptedConnections=0;
47 knops.gerd 37
48 knops.gerd 11
49     ###############################################################################
50     # Factory methods
51     ###############################################################################
52     sub newFromServer {
53     my $class=shift;
54    
55     my $self=$class->SUPER::newFromServer(shift);
56    
57     $self->{'headerBuffer'}='';
58     $self->{'MessageCount'}=0;
59     $self->{'MaxMessageCount'}=0;
60    
61     $self->{'ConnectionStart'}=time;
62     my $maxTime=$::CONF{'MaxTime'};
63     if($maxTime>0)
64     {
65     $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
66     }
67    
68 knops.gerd 25 $::Statistics->{'current_subscribers'}++;
69     $::Statistics->{'subscriber_connections_accepted'}++;
70    
71 knops.gerd 11 $self;
72     }
73    
74     ###############################################################################
75     # Class methods
76     ###############################################################################
77     sub deleteSubscriberWithID {
78     my $class=shift;
79     my $id=shift;
80    
81     if(exists($PersistentConnections{$id}))
82     {
83 knops.gerd 44 $PersistentConnections{$id}->close();
84 knops.gerd 11 }
85     }
86    
87     sub pingPersistentConnections {
88     my $class=shift;
89    
90     my @cons=values %PersistentConnections;
91    
92 knops.gerd 45 map { $_->ping() } @cons;
93 knops.gerd 11 }
94    
95     sub checkPersistentConnectionsForMaxTime {
96     my $class=shift;
97    
98     my $time=time;
99     my @cons=values %PersistentConnections;
100    
101     map { $_->checkForMaxTime($time) } @cons;
102     }
103    
104 knops.gerd 25 sub numSubscribers {
105    
106     return scalar(keys %PersistentConnections);
107     }
108    
109 knops.gerd 11 ###############################################################################
110     # Instance methods
111     ###############################################################################
112     sub processLine {
113     my $self=shift;
114     my $line=shift;
115    
116     # Once the header was processed we ignore any input
117     return unless(exists($self->{'headerBuffer'}));
118    
119     if($line ne '')
120     {
121     #
122     # Accumulate header
123     #
124     $self->{'headerBuffer'}.="$line\n";
125     }
126     else
127     {
128     #
129     # Empty line signals end of header.
130     # Analyze header, register with appropiate channel
131     # and send pending messages.
132     #
133 andrew.betts 32 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
134 knops.gerd 11 #
135     # Find the 'GET' line
136     #
137 andrew.betts 32 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
138 knops.gerd 11 {
139 andrew.betts 32 my $subscriberID=$1;
140     $self->{'mode'}=$2;
141 knops.gerd 45 my $persist=$self->getConf('Persist');
142    
143 andrew.betts 32 if ($self->{'mode'} eq "xhrinteractive" || $self->{'mode'} eq "iframe" || $self->{'mode'} eq "serversent" || $self->{'mode'} eq "longpoll") {
144     $persist=1;
145     $self->{'MaxMessageCount'}=1 unless(!($self->{'mode'} eq "longpoll"));
146     }
147     if ($self->{'mode'} eq "iframe") {
148     $self->{'HeaderTemplateNumber'}=1;
149     } else {
150     $self->{'HeaderTemplateNumber'}=2;
151     }
152 knops.gerd 45
153     my $maxTime=$self->getConf('MaxTime');
154     if($maxTime>0)
155     {
156     $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
157     }
158    
159 andrew.betts 32 my @channelData=split('/',$3);
160 knops.gerd 13 my $channels={};
161 andrew.betts 32 my $channelName;
162     my $offset;
163 andrew.betts 35 foreach my $chandef (@channelData) {
164     if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {
165 andrew.betts 32 $channelName = $1;
166     $channels->{$channelName}->{'startIndex'} = undef;
167 andrew.betts 35 if ($3) {
168     $offset = $4;
169     if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
170     if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
171     if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
172 knops.gerd 13 }
173 knops.gerd 11 }
174     }
175    
176 knops.gerd 13 delete($self->{'headerBuffer'});
177    
178 andrew.betts 32 if($persist)
179 knops.gerd 11 {
180     $self->{'subscriberID'}=$subscriberID;
181     $self->deleteSubscriberWithID($subscriberID);
182     $PersistentConnections{$subscriberID}=$self;
183     }
184    
185 knops.gerd 13 if(scalar(keys %{$channels}))
186 knops.gerd 11 {
187     $self->emitOKHeader();
188 knops.gerd 47 $self->setChannels($channels,$persist,$self->{'mode'},'');
189 knops.gerd 17 $self->close(1) unless($persist);
190 knops.gerd 11 return;
191     }
192     }
193 andrew.betts 32 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
194     {
195     $self->deleteSubscriberWithID($1);
196     $self->emitOKHeader();
197     $self->close(1);
198     return;
199     }
200 knops.gerd 11 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
201     {
202     Meteor::Document->serveFileToClient($1,$self);
203     $self->close(1);
204     return;
205     }
206    
207     #
208     # If we fall through we did not understand the request
209     #
210     $self->emitErrorHeader();
211     }
212     }
213    
214 knops.gerd 13 sub setChannels {
215 knops.gerd 11 my $self=shift;
216 knops.gerd 13 my $channels=shift;
217 knops.gerd 17 my $persist=shift;
218 knops.gerd 47 my $mode=shift || '';
219     my $userAgent=shift || '';
220 knops.gerd 11
221 knops.gerd 13 foreach my $channelName (keys %{$channels})
222     {
223     my $startIndex=$channels->{$channelName}->{'startIndex'};
224    
225     my $channel=Meteor::Channel->channelWithName($channelName);
226    
227     $self->{'channels'}->{$channelName}=$channel if($persist);
228    
229 knops.gerd 47 $channel->addSubscriber($self,$startIndex,$persist,$mode,$userAgent);
230 knops.gerd 13 }
231 knops.gerd 11 }
232    
233     sub emitOKHeader {
234     my $self=shift;
235    
236     $self->emitHeader('200 OK');
237     }
238    
239     sub emitErrorHeader {
240     my $self=shift;
241    
242     $self->emitHeader('404 Not Found');
243 knops.gerd 25 $::Statistics->{'errors_served'}++;
244 knops.gerd 11
245     # close up shop here!
246     $self->close();
247     }
248    
249     sub emitHeader {
250     my $self=shift;
251     my $status=shift;
252    
253     my $header=undef;
254     if(exists($self->{'HeaderTemplateNumber'}))
255     {
256     my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};
257    
258 knops.gerd 45 $header=$self->getConf($hn);
259 knops.gerd 11 }
260 knops.gerd 45 $header=$self->getConf('HeaderTemplate') unless(defined($header));
261 knops.gerd 11
262 knops.gerd 45 $header=~s/~([^~]*)~/
263 knops.gerd 11 if(!defined($1) || $1 eq '')
264     {
265     '~';
266     }
267     elsif($1 eq 'server')
268     {
269     $::PGM;
270     }
271     elsif($1 eq 'status')
272     {
273     $status;
274     }
275     elsif($1 eq 'servertime')
276     {
277     time;
278     }
279 knops.gerd 45 elsif($1 eq 'channelinfo')
280     {
281     Meteor::Channel->listChannelsUsingTemplate($self->getConf('ChannelInfoTemplate'));
282     }
283 knops.gerd 11 else
284     {
285     '';
286     }
287     /gex;
288    
289 andrew.betts 39 $self->write($header);
290 knops.gerd 11 }
291    
292 knops.gerd 45 sub sendMessages {
293 knops.gerd 11 my $self=shift;
294    
295 knops.gerd 45 my $numMessages=0;
296     my $msgTemplate=$self->getConf('Messagetemplate');
297     my $msgData='';
298 knops.gerd 25
299 knops.gerd 45 foreach my $message (@_)
300     {
301     $msgData.=$message->messageWithTemplate($msgTemplate);
302     $numMessages++;
303     }
304 knops.gerd 11
305 knops.gerd 45 return if($numMessages<1);
306 knops.gerd 25
307 knops.gerd 45 $self->write($msgData);
308 knops.gerd 11
309 knops.gerd 45 $::Statistics->{'messages_served'}+=$numMessages;
310    
311     my $msgCount=$self->{'MessageCount'};
312     $msgCount+=$numMessages;
313     $self->{'MessageCount'}=$msgCount;
314    
315     my $maxMsg=$self->getConf('MaxMessages');
316 knops.gerd 11 if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
317     {
318     $self->close(1);
319     }
320    
321     if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
322     {
323     $self->close(1);
324     }
325 knops.gerd 45
326 knops.gerd 11 }
327    
328 knops.gerd 45 sub ping {
329     my $self=shift;
330     my $msg=$self->getConf('PingMessage');
331    
332     $self->write($msg);
333     }
334    
335 knops.gerd 13 sub closeChannel {
336     my $self=shift;
337     my $channelName=shift;
338    
339     return unless(exists($self->{'channels'}->{$channelName}));
340    
341     my $channel=$self->{'channels'}->{$channelName};
342 knops.gerd 47 $channel->removeSubscriber($self,'channelClose');
343 knops.gerd 13
344     delete($self->{'channels'}->{$channelName});
345    
346 knops.gerd 47 $self->close(0,'channelsClosed') if(scalar(keys %{$self->{'channels'}})==0);
347 knops.gerd 13 }
348    
349 knops.gerd 11 sub close {
350     my $self=shift;
351     my $noShutdownMsg=shift;
352    
353 knops.gerd 13 foreach my $channelName (keys %{$self->{'channels'}})
354     {
355     my $channel=$self->{'channels'}->{$channelName};
356 knops.gerd 47 $channel->removeSubscriber($self,'subscriberClose');
357 knops.gerd 13 }
358     delete($self->{'channels'});
359 knops.gerd 11
360     if(exists($self->{'subscriberID'}))
361     {
362     delete($PersistentConnections{$self->{'subscriberID'}});
363     }
364    
365     #
366     # Send shutdown message unless remote closed or
367     # connection not yet established
368     #
369     unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
370     {
371 knops.gerd 45 my $msg=$self->getConf('SubscriberShutdownMsg');
372 knops.gerd 11 if(defined($msg) && $msg ne '')
373     {
374     $self->write($msg);
375     }
376     }
377    
378     $self->SUPER::close();
379     }
380    
381 knops.gerd 37 sub didClose {
382    
383     $::Statistics->{'current_subscribers'}--;
384     }
385    
386 knops.gerd 11 sub checkForMaxTime {
387     my $self=shift;
388     my $time=shift;
389    
390     $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
391     }
392    
393 knops.gerd 45 sub getConf {
394     my $self=shift;
395     my $key=shift;
396    
397     if(exists($self->{'mode'}) && $self->{'mode'} ne '')
398     {
399     my $k=$key.'.'.$self->{'mode'};
400    
401     return $::CONF{$k} if(exists($::CONF{$k}));
402     }
403    
404     $::CONF{$key};
405     }
406    
407 knops.gerd 11 1;
408 andrew.betts 3 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26