/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 25 - (hide annotations)
Sun May 20 19:40:53 2007 UTC (16 years, 10 months ago) by knops.gerd
File size: 9348 byte(s)
• Add simple statistics, available via new SHOWSTATS controller command

1 knops.gerd 11 #!/usr/bin/perl -w
2     ###############################################################################
3     # Meteor
4     # An HTTP server for the 2.0 web
5     # Copyright (c) 2006 contributing authors
6     #
7     # Subscriber.pm
8     #
9     # Description:
10     # A Meteor Subscriber
11     #
12     ###############################################################################
13     #
14     # This program is free software; you can redistribute it and/or modify it
15     # under the terms of the GNU General Public License as published by the Free
16     # Software Foundation; either version 2 of the License, or (at your option)
17     # any later version.
18     #
19     # This program is distributed in the hope that it will be useful, but WITHOUT
20     # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21     # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22     # more details.
23     #
24     # You should have received a copy of the GNU General Public License along
25     # with this program; if not, write to the Free Software Foundation, Inc.,
26     # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27     #
28     # For more information visit www.meteorserver.org
29     #
30     ###############################################################################
31    
32     package Meteor::Subscriber;
33     ###############################################################################
34     # Configuration
35     ###############################################################################
36    
37     use strict;
38    
39     use Meteor::Connection;
40     use Meteor::Channel;
41     use Meteor::Document;
42    
43     @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44    
45     our %PersistentConnections=();
46 knops.gerd 25 our $NumAcceptedConnections=0;
47 knops.gerd 11
48     ###############################################################################
49     # Factory methods
50     ###############################################################################
51     sub newFromServer {
52     my $class=shift;
53    
54     my $self=$class->SUPER::newFromServer(shift);
55    
56     $self->{'headerBuffer'}='';
57     $self->{'MessageCount'}=0;
58     $self->{'MaxMessageCount'}=0;
59    
60     $self->{'ConnectionStart'}=time;
61     my $maxTime=$::CONF{'MaxTime'};
62     if($maxTime>0)
63     {
64     $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
65     }
66    
67 knops.gerd 25 $::Statistics->{'current_subscribers'}++;
68     $::Statistics->{'subscriber_connections_accepted'}++;
69    
70 knops.gerd 11 $self;
71     }
72    
73     ###############################################################################
74     # Class methods
75     ###############################################################################
76     sub deleteSubscriberWithID {
77     my $class=shift;
78     my $id=shift;
79    
80     if(exists($PersistentConnections{$id}))
81     {
82     $PersistentConnections{$id}->close(1);
83     }
84     }
85    
86     sub pingPersistentConnections {
87     my $class=shift;
88    
89     my $msg=$::CONF{'PingMessage'};
90     my @cons=values %PersistentConnections;
91    
92     map { $_->write($msg) } @cons;
93     }
94    
95     sub checkPersistentConnectionsForMaxTime {
96     my $class=shift;
97    
98     my $time=time;
99     my @cons=values %PersistentConnections;
100    
101     map { $_->checkForMaxTime($time) } @cons;
102     }
103    
104 knops.gerd 25 sub numSubscribers {
105    
106     return scalar(keys %PersistentConnections);
107     }
108    
109 knops.gerd 11 ###############################################################################
110     # Instance methods
111     ###############################################################################
112     sub processLine {
113     my $self=shift;
114     my $line=shift;
115    
116     # Once the header was processed we ignore any input
117     return unless(exists($self->{'headerBuffer'}));
118    
119     if($line ne '')
120     {
121     #
122     # Accumulate header
123     #
124     $self->{'headerBuffer'}.="$line\n";
125     }
126     else
127     {
128     #
129     # Empty line signals end of header.
130     # Analyze header, register with appropiate channel
131     # and send pending messages.
132     #
133     # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1
134     #
135     # Find the 'GET' line
136     #
137     if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/)
138     {
139     my @formData=split('&',$1);
140     my $channelName=undef;
141     my $startIndex=undef;
142     my $backtrack=undef;
143     my $persist=1;
144     my $subscriberID=undef;
145 knops.gerd 13 my $channels={};
146 knops.gerd 11 foreach my $formElement (@formData)
147     {
148     if($formElement=~/^channel=(.+)$/)
149     {
150 knops.gerd 13 if(defined($channelName))
151     {
152     if(defined($startIndex) && defined($backtrack))
153     {
154     $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");
155     $self->close();
156    
157     return;
158     }
159    
160     $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));
161     $channels->{$channelName}->{'startIndex'}=$startIndex;
162    
163     $startIndex=undef;
164     $backtrack=undef;
165     }
166 knops.gerd 11 $channelName=$1;
167     }
168     elsif($formElement=~/^restartfrom=(\d*)$/)
169     {
170     $startIndex=$1;
171     $startIndex='' unless(defined($startIndex));
172     }
173     elsif($formElement=~/^backtrack=(\d+)$/)
174     {
175     $backtrack=$1;
176     $backtrack=0 unless(defined($backtrack));
177     }
178     elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/)
179     {
180     $persist=0 if($1=~/(no|false|0)/i);
181     }
182     elsif($formElement=~/^id=(.+)$/)
183     {
184     $subscriberID=$1;
185     }
186     elsif($formElement=~/^maxmessages=(\d+)$/i)
187     {
188     $self->{'MaxMessageCount'}=$1;
189     }
190     elsif($formElement=~/^template=(\d+)$/i)
191     {
192     $self->{'HeaderTemplateNumber'}=$1;
193     }
194     elsif($formElement=~/^maxtime=(\d+)$/i)
195     {
196     my $clientRequest=$1;
197     my $serverDefault=$::CONF{'MaxTime'};
198    
199     if($serverDefault==0 || $serverDefault>$clientRequest)
200     {
201     $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest;
202     }
203     }
204     }
205    
206 knops.gerd 13 if(defined($channelName))
207 knops.gerd 11 {
208 knops.gerd 13 if(defined($startIndex) && defined($backtrack))
209     {
210     $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");
211     $self->close();
212    
213     return;
214     }
215 knops.gerd 11
216 knops.gerd 13 $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));
217     $channels->{$channelName}->{'startIndex'}=$startIndex;
218 knops.gerd 11 }
219    
220 knops.gerd 13 delete($self->{'headerBuffer'});
221    
222 knops.gerd 17 if(defined($subscriberID) && $persist)
223 knops.gerd 11 {
224     $self->{'subscriberID'}=$subscriberID;
225     $self->deleteSubscriberWithID($subscriberID);
226     $PersistentConnections{$subscriberID}=$self;
227     }
228    
229 knops.gerd 13 if(scalar(keys %{$channels}))
230 knops.gerd 11 {
231     $self->emitOKHeader();
232    
233 knops.gerd 17 $self->setChannels($channels,$persist);
234 knops.gerd 11
235 knops.gerd 17 $self->close(1) unless($persist);
236 knops.gerd 11
237     return;
238     }
239     }
240     elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
241     {
242     Meteor::Document->serveFileToClient($1,$self);
243    
244     $self->close(1);
245    
246     return;
247     }
248    
249     #
250     # If we fall through we did not understand the request
251     #
252     $self->emitErrorHeader();
253     }
254     }
255    
256 knops.gerd 13 sub setChannels {
257 knops.gerd 11 my $self=shift;
258 knops.gerd 13 my $channels=shift;
259 knops.gerd 17 my $persist=shift;
260 knops.gerd 11
261 knops.gerd 13 foreach my $channelName (keys %{$channels})
262     {
263     my $startIndex=$channels->{$channelName}->{'startIndex'};
264    
265     my $channel=Meteor::Channel->channelWithName($channelName);
266    
267     $self->{'channels'}->{$channelName}=$channel if($persist);
268    
269     $channel->addSubscriber($self,$startIndex,$persist);
270     }
271 knops.gerd 11 }
272    
273     sub emitOKHeader {
274     my $self=shift;
275    
276     $self->emitHeader('200 OK');
277     }
278    
279     sub emitErrorHeader {
280     my $self=shift;
281    
282     $self->emitHeader('404 Not Found');
283 knops.gerd 25 $::Statistics->{'errors_served'}++;
284 knops.gerd 11
285     # close up shop here!
286     $self->close();
287     }
288    
289     sub emitHeader {
290     my $self=shift;
291     my $status=shift;
292    
293     my $header=undef;
294     if(exists($self->{'HeaderTemplateNumber'}))
295     {
296     my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};
297    
298     $header=$::CONF{$hn};
299     }
300     $header=$::CONF{'HeaderTemplate'} unless(defined($header));
301    
302     $header=~s/~([^~]+)~/
303     if(!defined($1) || $1 eq '')
304     {
305     '~';
306     }
307     elsif($1 eq 'server')
308     {
309     $::PGM;
310     }
311     elsif($1 eq 'status')
312     {
313     $status;
314     }
315     elsif($1 eq 'servertime')
316     {
317     time;
318     }
319     else
320     {
321     '';
322     }
323     /gex;
324    
325     $self->write($header);
326     }
327    
328     sub sendMessage {
329     my $self=shift;
330     my $msg=shift;
331 knops.gerd 25 my $numMsgInThisBatch=shift;
332 knops.gerd 11
333 knops.gerd 25 $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch));
334    
335 knops.gerd 11 $self->write($msg);
336    
337 knops.gerd 25 $::Statistics->{'messages_served'}+=$numMsgInThisBatch;
338    
339 knops.gerd 11 my $msgCount=++$self->{'MessageCount'};
340    
341     my $maxMsg=$::CONF{'MaxMessages'};
342     if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
343     {
344     $self->close(1);
345     }
346    
347     if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
348     {
349     $self->close(1);
350     }
351     }
352    
353 knops.gerd 13 sub closeChannel {
354     my $self=shift;
355     my $channelName=shift;
356    
357     return unless(exists($self->{'channels'}->{$channelName}));
358    
359     my $channel=$self->{'channels'}->{$channelName};
360     $channel->removeSubscriber($self);
361    
362     delete($self->{'channels'}->{$channelName});
363    
364     $self->close() if(scalar(keys %{$self->{'channels'}})==0);
365     }
366    
367 knops.gerd 11 sub close {
368     my $self=shift;
369     my $noShutdownMsg=shift;
370    
371 knops.gerd 13 foreach my $channelName (keys %{$self->{'channels'}})
372     {
373     my $channel=$self->{'channels'}->{$channelName};
374     $channel->removeSubscriber($self);
375     }
376     delete($self->{'channels'});
377 knops.gerd 11
378     if(exists($self->{'subscriberID'}))
379     {
380     delete($PersistentConnections{$self->{'subscriberID'}});
381     }
382    
383     #
384     # Send shutdown message unless remote closed or
385     # connection not yet established
386     #
387     unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
388     {
389     my $msg=$::CONF{'SubscriberShutdownMsg'};
390     if(defined($msg) && $msg ne '')
391     {
392     $self->write($msg);
393     }
394     }
395    
396 knops.gerd 25 $::Statistics->{'current_subscribers'}--;
397    
398 knops.gerd 11 $self->SUPER::close();
399     }
400    
401     sub checkForMaxTime {
402     my $self=shift;
403     my $time=shift;
404    
405     $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
406     }
407    
408     1;
409 andrew.betts 3 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26