/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 13 - (hide annotations)
Mon Apr 30 18:16:17 2007 UTC (16 years, 11 months ago) by knops.gerd
File size: 9147 byte(s)
• Allow to subscribe to multiple channels

1 knops.gerd 11 #!/usr/bin/perl -w
2     ###############################################################################
3     # Meteor
4     # An HTTP server for the 2.0 web
5     # Copyright (c) 2006 contributing authors
6     #
7     # Subscriber.pm
8     #
9     # Description:
10     # A Meteor Subscriber
11     #
12     ###############################################################################
13     #
14     # This program is free software; you can redistribute it and/or modify it
15     # under the terms of the GNU General Public License as published by the Free
16     # Software Foundation; either version 2 of the License, or (at your option)
17     # any later version.
18     #
19     # This program is distributed in the hope that it will be useful, but WITHOUT
20     # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21     # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22     # more details.
23     #
24     # You should have received a copy of the GNU General Public License along
25     # with this program; if not, write to the Free Software Foundation, Inc.,
26     # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27     #
28     # For more information visit www.meteorserver.org
29     #
30     ###############################################################################
31    
32     package Meteor::Subscriber;
33     ###############################################################################
34     # Configuration
35     ###############################################################################
36    
37     use strict;
38    
39     use Meteor::Connection;
40     use Meteor::Channel;
41     use Meteor::Document;
42    
43     @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44    
45     our %PersistentConnections=();
46    
47     ###############################################################################
48     # Factory methods
49     ###############################################################################
50     sub newFromServer {
51     my $class=shift;
52    
53     my $self=$class->SUPER::newFromServer(shift);
54    
55     $self->{'headerBuffer'}='';
56     $self->{'MessageCount'}=0;
57     $self->{'MaxMessageCount'}=0;
58    
59     $self->{'ConnectionStart'}=time;
60     my $maxTime=$::CONF{'MaxTime'};
61     if($maxTime>0)
62     {
63     $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
64     }
65    
66     $self;
67     }
68    
69     ###############################################################################
70     # Class methods
71     ###############################################################################
72     sub deleteSubscriberWithID {
73     my $class=shift;
74     my $id=shift;
75    
76     if(exists($PersistentConnections{$id}))
77     {
78     $PersistentConnections{$id}->close(1);
79     }
80     }
81    
82     sub pingPersistentConnections {
83     my $class=shift;
84    
85     my $msg=$::CONF{'PingMessage'};
86     my @cons=values %PersistentConnections;
87    
88     map { $_->write($msg) } @cons;
89     }
90    
91     sub checkPersistentConnectionsForMaxTime {
92     my $class=shift;
93    
94     my $time=time;
95     my @cons=values %PersistentConnections;
96    
97     map { $_->checkForMaxTime($time) } @cons;
98     }
99    
100     ###############################################################################
101     # Instance methods
102     ###############################################################################
103     sub processLine {
104     my $self=shift;
105     my $line=shift;
106    
107     # Once the header was processed we ignore any input
108     return unless(exists($self->{'headerBuffer'}));
109    
110     if($line ne '')
111     {
112     #
113     # Accumulate header
114     #
115     $self->{'headerBuffer'}.="$line\n";
116     }
117     else
118     {
119     #
120     # Empty line signals end of header.
121     # Analyze header, register with appropiate channel
122     # and send pending messages.
123     #
124     # GET $::CONF{'SubscriberDynamicPageAddress'}?channel=ml123&restartfrom=1 HTTP/1.1
125     #
126     # Find the 'GET' line
127     #
128     if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\?(\S+)/)
129     {
130     my @formData=split('&',$1);
131     my $channelName=undef;
132     my $startIndex=undef;
133     my $backtrack=undef;
134     my $persist=1;
135 knops.gerd 13 my $anyPersist=0;
136 knops.gerd 11 my $subscriberID=undef;
137 knops.gerd 13 my $channels={};
138 knops.gerd 11 foreach my $formElement (@formData)
139     {
140     if($formElement=~/^channel=(.+)$/)
141     {
142 knops.gerd 13 if(defined($channelName))
143     {
144     if(defined($startIndex) && defined($backtrack))
145     {
146     $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");
147     $self->close();
148    
149     return;
150     }
151    
152     $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));
153     $channels->{$channelName}->{'startIndex'}=$startIndex;
154     $channels->{$channelName}->{'persist'}=$persist;
155     $anyPersist|=$persist;
156    
157     $startIndex=undef;
158     $backtrack=undef;
159     $persist=1;
160     }
161 knops.gerd 11 $channelName=$1;
162     }
163     elsif($formElement=~/^restartfrom=(\d*)$/)
164     {
165     $startIndex=$1;
166     $startIndex='' unless(defined($startIndex));
167     }
168     elsif($formElement=~/^backtrack=(\d+)$/)
169     {
170     $backtrack=$1;
171     $backtrack=0 unless(defined($backtrack));
172     }
173     elsif($formElement=~/^persist=(?i)(yes|true|1|no|false|0)$/)
174     {
175     $persist=0 if($1=~/(no|false|0)/i);
176     }
177     elsif($formElement=~/^id=(.+)$/)
178     {
179     $subscriberID=$1;
180     }
181     elsif($formElement=~/^maxmessages=(\d+)$/i)
182     {
183     $self->{'MaxMessageCount'}=$1;
184     }
185     elsif($formElement=~/^template=(\d+)$/i)
186     {
187     $self->{'HeaderTemplateNumber'}=$1;
188     }
189     elsif($formElement=~/^maxtime=(\d+)$/i)
190     {
191     my $clientRequest=$1;
192     my $serverDefault=$::CONF{'MaxTime'};
193    
194     if($serverDefault==0 || $serverDefault>$clientRequest)
195     {
196     $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$clientRequest;
197     }
198     }
199     }
200    
201 knops.gerd 13 if(defined($channelName))
202 knops.gerd 11 {
203 knops.gerd 13 if(defined($startIndex) && defined($backtrack))
204     {
205     $self->emitHeader("404 Cannot use both 'restartfrom' and 'backtrack'");
206     $self->close();
207    
208     return;
209     }
210 knops.gerd 11
211 knops.gerd 13 $startIndex=-$backtrack if(!defined($startIndex) && defined($backtrack));
212     $channels->{$channelName}->{'startIndex'}=$startIndex;
213     $channels->{$channelName}->{'persist'}=$persist;
214     $anyPersist|=$persist;
215 knops.gerd 11 }
216    
217 knops.gerd 13 delete($self->{'headerBuffer'});
218    
219     if(defined($subscriberID) && $anyPersist)
220 knops.gerd 11 {
221     $self->{'subscriberID'}=$subscriberID;
222     $self->deleteSubscriberWithID($subscriberID);
223     $PersistentConnections{$subscriberID}=$self;
224     }
225    
226 knops.gerd 13 if(scalar(keys %{$channels}))
227 knops.gerd 11 {
228     $self->emitOKHeader();
229    
230 knops.gerd 13 $self->setChannels($channels);
231 knops.gerd 11
232 knops.gerd 13 $self->close(1) unless($anyPersist);
233 knops.gerd 11
234     return;
235     }
236     }
237     elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
238     {
239     Meteor::Document->serveFileToClient($1,$self);
240    
241     $self->close(1);
242    
243     return;
244     }
245    
246     #
247     # If we fall through we did not understand the request
248     #
249     $self->emitErrorHeader();
250     }
251     }
252    
253 knops.gerd 13 sub setChannels {
254 knops.gerd 11 my $self=shift;
255 knops.gerd 13 my $channels=shift;
256 knops.gerd 11
257 knops.gerd 13 foreach my $channelName (keys %{$channels})
258     {
259     my $persist=$channels->{$channelName}->{'persist'};
260     my $startIndex=$channels->{$channelName}->{'startIndex'};
261    
262     my $channel=Meteor::Channel->channelWithName($channelName);
263    
264     $self->{'channels'}->{$channelName}=$channel if($persist);
265    
266     $channel->addSubscriber($self,$startIndex,$persist);
267     }
268 knops.gerd 11 }
269    
270     sub emitOKHeader {
271     my $self=shift;
272    
273     $self->emitHeader('200 OK');
274     }
275    
276     sub emitErrorHeader {
277     my $self=shift;
278    
279     $self->emitHeader('404 Not Found');
280    
281     # close up shop here!
282     $self->close();
283     }
284    
285     sub emitHeader {
286     my $self=shift;
287     my $status=shift;
288    
289     my $header=undef;
290     if(exists($self->{'HeaderTemplateNumber'}))
291     {
292     my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};
293    
294     $header=$::CONF{$hn};
295     }
296     $header=$::CONF{'HeaderTemplate'} unless(defined($header));
297    
298     $header=~s/~([^~]+)~/
299     if(!defined($1) || $1 eq '')
300     {
301     '~';
302     }
303     elsif($1 eq 'server')
304     {
305     $::PGM;
306     }
307     elsif($1 eq 'status')
308     {
309     $status;
310     }
311     elsif($1 eq 'servertime')
312     {
313     time;
314     }
315     else
316     {
317     '';
318     }
319     /gex;
320    
321     $self->write($header);
322     }
323    
324     sub sendMessage {
325     my $self=shift;
326     my $msg=shift;
327    
328     $self->write($msg);
329    
330     my $msgCount=++$self->{'MessageCount'};
331    
332     my $maxMsg=$::CONF{'MaxMessages'};
333     if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
334     {
335     $self->close(1);
336     }
337    
338     if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
339     {
340     $self->close(1);
341     }
342     }
343    
344 knops.gerd 13 sub closeChannel {
345     my $self=shift;
346     my $channelName=shift;
347    
348     return unless(exists($self->{'channels'}->{$channelName}));
349    
350     my $channel=$self->{'channels'}->{$channelName};
351     $channel->removeSubscriber($self);
352    
353     delete($self->{'channels'}->{$channelName});
354    
355     $self->close() if(scalar(keys %{$self->{'channels'}})==0);
356     }
357    
358 knops.gerd 11 sub close {
359     my $self=shift;
360     my $noShutdownMsg=shift;
361    
362 knops.gerd 13 foreach my $channelName (keys %{$self->{'channels'}})
363     {
364     my $channel=$self->{'channels'}->{$channelName};
365     $channel->removeSubscriber($self);
366     }
367     delete($self->{'channels'});
368 knops.gerd 11
369     if(exists($self->{'subscriberID'}))
370     {
371     delete($PersistentConnections{$self->{'subscriberID'}});
372     }
373    
374     #
375     # Send shutdown message unless remote closed or
376     # connection not yet established
377     #
378     unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
379     {
380     my $msg=$::CONF{'SubscriberShutdownMsg'};
381     if(defined($msg) && $msg ne '')
382     {
383     $self->write($msg);
384     }
385     }
386    
387     $self->SUPER::close();
388     }
389    
390     sub checkForMaxTime {
391     my $self=shift;
392     my $time=shift;
393    
394     $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
395     }
396    
397     1;
398 andrew.betts 3 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26