/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 35 - (hide annotations)
Fri Jan 25 17:12:02 2008 UTC (16 years, 2 months ago) by andrew.betts
File size: 8481 byte(s)
Terminated each message with a null byte to allow Flash clients to work with XML Sockets
Cleared a warning
Allowed uppercase channel names

1 knops.gerd 11 #!/usr/bin/perl -w
2     ###############################################################################
3     # Meteor
4     # An HTTP server for the 2.0 web
5     # Copyright (c) 2006 contributing authors
6     #
7     # Subscriber.pm
8     #
9     # Description:
10     # A Meteor Subscriber
11     #
12     ###############################################################################
13     #
14     # This program is free software; you can redistribute it and/or modify it
15     # under the terms of the GNU General Public License as published by the Free
16     # Software Foundation; either version 2 of the License, or (at your option)
17     # any later version.
18     #
19     # This program is distributed in the hope that it will be useful, but WITHOUT
20     # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21     # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22     # more details.
23     #
24     # You should have received a copy of the GNU General Public License along
25     # with this program; if not, write to the Free Software Foundation, Inc.,
26     # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27     #
28     # For more information visit www.meteorserver.org
29     #
30     ###############################################################################
31    
32     package Meteor::Subscriber;
33     ###############################################################################
34     # Configuration
35     ###############################################################################
36    
37     use strict;
38    
39     use Meteor::Connection;
40     use Meteor::Channel;
41     use Meteor::Document;
42    
43     @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44    
45     our %PersistentConnections=();
46 knops.gerd 25 our $NumAcceptedConnections=0;
47 knops.gerd 11
48     ###############################################################################
49     # Factory methods
50     ###############################################################################
51     sub newFromServer {
52     my $class=shift;
53    
54     my $self=$class->SUPER::newFromServer(shift);
55    
56     $self->{'headerBuffer'}='';
57     $self->{'MessageCount'}=0;
58     $self->{'MaxMessageCount'}=0;
59    
60     $self->{'ConnectionStart'}=time;
61     my $maxTime=$::CONF{'MaxTime'};
62     if($maxTime>0)
63     {
64     $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
65     }
66    
67 knops.gerd 25 $::Statistics->{'current_subscribers'}++;
68     $::Statistics->{'subscriber_connections_accepted'}++;
69    
70 knops.gerd 11 $self;
71     }
72    
73     ###############################################################################
74     # Class methods
75     ###############################################################################
76     sub deleteSubscriberWithID {
77     my $class=shift;
78     my $id=shift;
79    
80     if(exists($PersistentConnections{$id}))
81     {
82     $PersistentConnections{$id}->close(1);
83     }
84     }
85    
86     sub pingPersistentConnections {
87     my $class=shift;
88    
89     my $msg=$::CONF{'PingMessage'};
90     my @cons=values %PersistentConnections;
91    
92 andrew.betts 35 map { $_->write($msg.chr(0)) } @cons;
93 knops.gerd 11 }
94    
95     sub checkPersistentConnectionsForMaxTime {
96     my $class=shift;
97    
98     my $time=time;
99     my @cons=values %PersistentConnections;
100    
101     map { $_->checkForMaxTime($time) } @cons;
102     }
103    
104 knops.gerd 25 sub numSubscribers {
105    
106     return scalar(keys %PersistentConnections);
107     }
108    
109 knops.gerd 11 ###############################################################################
110     # Instance methods
111     ###############################################################################
112     sub processLine {
113     my $self=shift;
114     my $line=shift;
115    
116     # Once the header was processed we ignore any input
117     return unless(exists($self->{'headerBuffer'}));
118    
119     if($line ne '')
120     {
121     #
122     # Accumulate header
123     #
124     $self->{'headerBuffer'}.="$line\n";
125     }
126     else
127     {
128     #
129     # Empty line signals end of header.
130     # Analyze header, register with appropiate channel
131     # and send pending messages.
132     #
133 andrew.betts 32 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
134 knops.gerd 11 #
135     # Find the 'GET' line
136     #
137 andrew.betts 32 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
138 knops.gerd 11 {
139 andrew.betts 32 my $subscriberID=$1;
140     my $persist=0;
141     $self->{'mode'}=$2;
142     if ($self->{'mode'} eq "xhrinteractive" || $self->{'mode'} eq "iframe" || $self->{'mode'} eq "serversent" || $self->{'mode'} eq "longpoll") {
143     $persist=1;
144     $self->{'MaxMessageCount'}=1 unless(!($self->{'mode'} eq "longpoll"));
145     }
146     if ($self->{'mode'} eq "iframe") {
147     $self->{'HeaderTemplateNumber'}=1;
148     } else {
149     $self->{'HeaderTemplateNumber'}=2;
150     }
151     my @channelData=split('/',$3);
152 knops.gerd 13 my $channels={};
153 andrew.betts 32 my $channelName;
154     my $offset;
155 andrew.betts 35 foreach my $chandef (@channelData) {
156     if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {
157 andrew.betts 32 $channelName = $1;
158     $channels->{$channelName}->{'startIndex'} = undef;
159 andrew.betts 35 if ($3) {
160     $offset = $4;
161     if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
162     if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
163     if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
164 knops.gerd 13 }
165 knops.gerd 11 }
166     }
167    
168 knops.gerd 13 delete($self->{'headerBuffer'});
169    
170 andrew.betts 32 if($persist)
171 knops.gerd 11 {
172     $self->{'subscriberID'}=$subscriberID;
173     $self->deleteSubscriberWithID($subscriberID);
174     $PersistentConnections{$subscriberID}=$self;
175     }
176    
177 knops.gerd 13 if(scalar(keys %{$channels}))
178 knops.gerd 11 {
179     $self->emitOKHeader();
180 knops.gerd 17 $self->setChannels($channels,$persist);
181     $self->close(1) unless($persist);
182 knops.gerd 11 return;
183     }
184     }
185 andrew.betts 32 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
186     {
187     $self->deleteSubscriberWithID($1);
188     $self->emitOKHeader();
189     $self->close(1);
190     return;
191     }
192 knops.gerd 11 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
193     {
194     Meteor::Document->serveFileToClient($1,$self);
195     $self->close(1);
196     return;
197     }
198    
199     #
200     # If we fall through we did not understand the request
201     #
202     $self->emitErrorHeader();
203     }
204     }
205    
206 knops.gerd 13 sub setChannels {
207 knops.gerd 11 my $self=shift;
208 knops.gerd 13 my $channels=shift;
209 knops.gerd 17 my $persist=shift;
210 knops.gerd 11
211 knops.gerd 13 foreach my $channelName (keys %{$channels})
212     {
213     my $startIndex=$channels->{$channelName}->{'startIndex'};
214    
215     my $channel=Meteor::Channel->channelWithName($channelName);
216    
217     $self->{'channels'}->{$channelName}=$channel if($persist);
218    
219     $channel->addSubscriber($self,$startIndex,$persist);
220     }
221 knops.gerd 11 }
222    
223     sub emitOKHeader {
224     my $self=shift;
225    
226     $self->emitHeader('200 OK');
227     }
228    
229     sub emitErrorHeader {
230     my $self=shift;
231    
232     $self->emitHeader('404 Not Found');
233 knops.gerd 25 $::Statistics->{'errors_served'}++;
234 knops.gerd 11
235     # close up shop here!
236     $self->close();
237     }
238    
239     sub emitHeader {
240     my $self=shift;
241     my $status=shift;
242    
243     my $header=undef;
244     if(exists($self->{'HeaderTemplateNumber'}))
245     {
246     my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};
247    
248     $header=$::CONF{$hn};
249     }
250     $header=$::CONF{'HeaderTemplate'} unless(defined($header));
251    
252     $header=~s/~([^~]+)~/
253     if(!defined($1) || $1 eq '')
254     {
255     '~';
256     }
257     elsif($1 eq 'server')
258     {
259     $::PGM;
260     }
261     elsif($1 eq 'status')
262     {
263     $status;
264     }
265     elsif($1 eq 'servertime')
266     {
267     time;
268     }
269     else
270     {
271     '';
272     }
273     /gex;
274    
275 andrew.betts 35 $self->write($header.chr(0));
276 knops.gerd 11 }
277    
278     sub sendMessage {
279     my $self=shift;
280     my $msg=shift;
281 knops.gerd 25 my $numMsgInThisBatch=shift;
282 knops.gerd 11
283 knops.gerd 25 $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch));
284    
285 andrew.betts 35 $self->write($msg.chr(0));
286 knops.gerd 11
287 knops.gerd 25 $::Statistics->{'messages_served'}+=$numMsgInThisBatch;
288    
289 knops.gerd 11 my $msgCount=++$self->{'MessageCount'};
290    
291     my $maxMsg=$::CONF{'MaxMessages'};
292     if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
293     {
294     $self->close(1);
295     }
296    
297     if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
298     {
299     $self->close(1);
300     }
301     }
302    
303 knops.gerd 13 sub closeChannel {
304     my $self=shift;
305     my $channelName=shift;
306    
307     return unless(exists($self->{'channels'}->{$channelName}));
308    
309     my $channel=$self->{'channels'}->{$channelName};
310     $channel->removeSubscriber($self);
311    
312     delete($self->{'channels'}->{$channelName});
313    
314     $self->close() if(scalar(keys %{$self->{'channels'}})==0);
315     }
316    
317 knops.gerd 11 sub close {
318     my $self=shift;
319     my $noShutdownMsg=shift;
320    
321 knops.gerd 13 foreach my $channelName (keys %{$self->{'channels'}})
322     {
323     my $channel=$self->{'channels'}->{$channelName};
324     $channel->removeSubscriber($self);
325     }
326     delete($self->{'channels'});
327 knops.gerd 11
328     if(exists($self->{'subscriberID'}))
329     {
330     delete($PersistentConnections{$self->{'subscriberID'}});
331     }
332    
333     #
334     # Send shutdown message unless remote closed or
335     # connection not yet established
336     #
337     unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
338     {
339     my $msg=$::CONF{'SubscriberShutdownMsg'};
340     if(defined($msg) && $msg ne '')
341     {
342     $self->write($msg);
343     }
344     }
345    
346 knops.gerd 25 $::Statistics->{'current_subscribers'}--;
347    
348 knops.gerd 11 $self->SUPER::close();
349     }
350    
351     sub checkForMaxTime {
352     my $self=shift;
353     my $time=shift;
354    
355     $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
356     }
357    
358     1;
359 andrew.betts 3 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26