/[meteor]/googlecode.com/svn/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /googlecode.com/svn/trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 37 - (hide annotations)
Fri Feb 1 21:22:03 2008 UTC (16 years, 1 month ago) by knops.gerd
File size: 8504 byte(s)
• Connection: In checkAllHandleBits (and to be sure also in addAllHandleBits) iterate over copy of @Connections, as @Connections might change during iteration due to connections being closed
• Controller, Subscriber: pending data will abort a close and call it again later, causing current_subscribers/current_controllers statistics to be incorrect. Added new 'didClose' method that is only called when a connection is actually closed, and this is where the close is counted.

1 knops.gerd 11 #!/usr/bin/perl -w
2     ###############################################################################
3     # Meteor
4     # An HTTP server for the 2.0 web
5     # Copyright (c) 2006 contributing authors
6     #
7     # Subscriber.pm
8     #
9     # Description:
10     # A Meteor Subscriber
11     #
12     ###############################################################################
13     #
14     # This program is free software; you can redistribute it and/or modify it
15     # under the terms of the GNU General Public License as published by the Free
16     # Software Foundation; either version 2 of the License, or (at your option)
17     # any later version.
18     #
19     # This program is distributed in the hope that it will be useful, but WITHOUT
20     # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21     # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22     # more details.
23     #
24     # You should have received a copy of the GNU General Public License along
25     # with this program; if not, write to the Free Software Foundation, Inc.,
26     # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27     #
28     # For more information visit www.meteorserver.org
29     #
30     ###############################################################################
31    
32     package Meteor::Subscriber;
33     ###############################################################################
34     # Configuration
35     ###############################################################################
36    
37     use strict;
38    
39     use Meteor::Connection;
40     use Meteor::Channel;
41     use Meteor::Document;
42    
43     @Meteor::Subscriber::ISA=qw(Meteor::Connection);
44    
45     our %PersistentConnections=();
46 knops.gerd 25 our $NumAcceptedConnections=0;
47 knops.gerd 37
48 knops.gerd 11
49     ###############################################################################
50     # Factory methods
51     ###############################################################################
52     sub newFromServer {
53     my $class=shift;
54    
55     my $self=$class->SUPER::newFromServer(shift);
56    
57     $self->{'headerBuffer'}='';
58     $self->{'MessageCount'}=0;
59     $self->{'MaxMessageCount'}=0;
60    
61     $self->{'ConnectionStart'}=time;
62     my $maxTime=$::CONF{'MaxTime'};
63     if($maxTime>0)
64     {
65     $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
66     }
67    
68 knops.gerd 25 $::Statistics->{'current_subscribers'}++;
69     $::Statistics->{'subscriber_connections_accepted'}++;
70    
71 knops.gerd 11 $self;
72     }
73    
74     ###############################################################################
75     # Class methods
76     ###############################################################################
77     sub deleteSubscriberWithID {
78     my $class=shift;
79     my $id=shift;
80    
81     if(exists($PersistentConnections{$id}))
82     {
83     $PersistentConnections{$id}->close(1);
84     }
85     }
86    
87     sub pingPersistentConnections {
88     my $class=shift;
89    
90     my $msg=$::CONF{'PingMessage'};
91     my @cons=values %PersistentConnections;
92    
93 andrew.betts 35 map { $_->write($msg.chr(0)) } @cons;
94 knops.gerd 11 }
95    
96     sub checkPersistentConnectionsForMaxTime {
97     my $class=shift;
98    
99     my $time=time;
100     my @cons=values %PersistentConnections;
101    
102     map { $_->checkForMaxTime($time) } @cons;
103     }
104    
105 knops.gerd 25 sub numSubscribers {
106    
107     return scalar(keys %PersistentConnections);
108     }
109    
110 knops.gerd 11 ###############################################################################
111     # Instance methods
112     ###############################################################################
113     sub processLine {
114     my $self=shift;
115     my $line=shift;
116    
117     # Once the header was processed we ignore any input
118     return unless(exists($self->{'headerBuffer'}));
119    
120     if($line ne '')
121     {
122     #
123     # Accumulate header
124     #
125     $self->{'headerBuffer'}.="$line\n";
126     }
127     else
128     {
129     #
130     # Empty line signals end of header.
131     # Analyze header, register with appropiate channel
132     # and send pending messages.
133     #
134 andrew.betts 32 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
135 knops.gerd 11 #
136     # Find the 'GET' line
137     #
138 andrew.betts 32 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/(\S+)/i)
139 knops.gerd 11 {
140 andrew.betts 32 my $subscriberID=$1;
141     my $persist=0;
142     $self->{'mode'}=$2;
143     if ($self->{'mode'} eq "xhrinteractive" || $self->{'mode'} eq "iframe" || $self->{'mode'} eq "serversent" || $self->{'mode'} eq "longpoll") {
144     $persist=1;
145     $self->{'MaxMessageCount'}=1 unless(!($self->{'mode'} eq "longpoll"));
146     }
147     if ($self->{'mode'} eq "iframe") {
148     $self->{'HeaderTemplateNumber'}=1;
149     } else {
150     $self->{'HeaderTemplateNumber'}=2;
151     }
152     my @channelData=split('/',$3);
153 knops.gerd 13 my $channels={};
154 andrew.betts 32 my $channelName;
155     my $offset;
156 andrew.betts 35 foreach my $chandef (@channelData) {
157     if($chandef=~/^([a-z0-9]+)(.(r|b|h)([0-9]*))?$/i) {
158 andrew.betts 32 $channelName = $1;
159     $channels->{$channelName}->{'startIndex'} = undef;
160 andrew.betts 35 if ($3) {
161     $offset = $4;
162     if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
163     if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
164     if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
165 knops.gerd 13 }
166 knops.gerd 11 }
167     }
168    
169 knops.gerd 13 delete($self->{'headerBuffer'});
170    
171 andrew.betts 32 if($persist)
172 knops.gerd 11 {
173     $self->{'subscriberID'}=$subscriberID;
174     $self->deleteSubscriberWithID($subscriberID);
175     $PersistentConnections{$subscriberID}=$self;
176     }
177    
178 knops.gerd 13 if(scalar(keys %{$channels}))
179 knops.gerd 11 {
180     $self->emitOKHeader();
181 knops.gerd 17 $self->setChannels($channels,$persist);
182     $self->close(1) unless($persist);
183 knops.gerd 11 return;
184     }
185     }
186 andrew.betts 32 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
187     {
188     $self->deleteSubscriberWithID($1);
189     $self->emitOKHeader();
190     $self->close(1);
191     return;
192     }
193 knops.gerd 11 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
194     {
195     Meteor::Document->serveFileToClient($1,$self);
196     $self->close(1);
197     return;
198     }
199    
200     #
201     # If we fall through we did not understand the request
202     #
203     $self->emitErrorHeader();
204     }
205     }
206    
207 knops.gerd 13 sub setChannels {
208 knops.gerd 11 my $self=shift;
209 knops.gerd 13 my $channels=shift;
210 knops.gerd 17 my $persist=shift;
211 knops.gerd 11
212 knops.gerd 13 foreach my $channelName (keys %{$channels})
213     {
214     my $startIndex=$channels->{$channelName}->{'startIndex'};
215    
216     my $channel=Meteor::Channel->channelWithName($channelName);
217    
218     $self->{'channels'}->{$channelName}=$channel if($persist);
219    
220     $channel->addSubscriber($self,$startIndex,$persist);
221     }
222 knops.gerd 11 }
223    
224     sub emitOKHeader {
225     my $self=shift;
226    
227     $self->emitHeader('200 OK');
228     }
229    
230     sub emitErrorHeader {
231     my $self=shift;
232    
233     $self->emitHeader('404 Not Found');
234 knops.gerd 25 $::Statistics->{'errors_served'}++;
235 knops.gerd 11
236     # close up shop here!
237     $self->close();
238     }
239    
240     sub emitHeader {
241     my $self=shift;
242     my $status=shift;
243    
244     my $header=undef;
245     if(exists($self->{'HeaderTemplateNumber'}))
246     {
247     my $hn='HeaderTemplate'.$self->{'HeaderTemplateNumber'};
248    
249     $header=$::CONF{$hn};
250     }
251     $header=$::CONF{'HeaderTemplate'} unless(defined($header));
252    
253     $header=~s/~([^~]+)~/
254     if(!defined($1) || $1 eq '')
255     {
256     '~';
257     }
258     elsif($1 eq 'server')
259     {
260     $::PGM;
261     }
262     elsif($1 eq 'status')
263     {
264     $status;
265     }
266     elsif($1 eq 'servertime')
267     {
268     time;
269     }
270     else
271     {
272     '';
273     }
274     /gex;
275    
276 andrew.betts 35 $self->write($header.chr(0));
277 knops.gerd 11 }
278    
279     sub sendMessage {
280     my $self=shift;
281     my $msg=shift;
282 knops.gerd 25 my $numMsgInThisBatch=shift;
283 knops.gerd 11
284 knops.gerd 25 $numMsgInThisBatch=1 unless(defined($numMsgInThisBatch));
285    
286 andrew.betts 35 $self->write($msg.chr(0));
287 knops.gerd 11
288 knops.gerd 25 $::Statistics->{'messages_served'}+=$numMsgInThisBatch;
289    
290 knops.gerd 11 my $msgCount=++$self->{'MessageCount'};
291    
292     my $maxMsg=$::CONF{'MaxMessages'};
293     if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
294     {
295     $self->close(1);
296     }
297    
298     if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
299     {
300     $self->close(1);
301     }
302     }
303    
304 knops.gerd 13 sub closeChannel {
305     my $self=shift;
306     my $channelName=shift;
307    
308     return unless(exists($self->{'channels'}->{$channelName}));
309    
310     my $channel=$self->{'channels'}->{$channelName};
311     $channel->removeSubscriber($self);
312    
313     delete($self->{'channels'}->{$channelName});
314    
315     $self->close() if(scalar(keys %{$self->{'channels'}})==0);
316     }
317    
318 knops.gerd 11 sub close {
319     my $self=shift;
320     my $noShutdownMsg=shift;
321    
322 knops.gerd 13 foreach my $channelName (keys %{$self->{'channels'}})
323     {
324     my $channel=$self->{'channels'}->{$channelName};
325     $channel->removeSubscriber($self);
326     }
327     delete($self->{'channels'});
328 knops.gerd 11
329     if(exists($self->{'subscriberID'}))
330     {
331     delete($PersistentConnections{$self->{'subscriberID'}});
332     }
333    
334     #
335     # Send shutdown message unless remote closed or
336     # connection not yet established
337     #
338     unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
339     {
340     my $msg=$::CONF{'SubscriberShutdownMsg'};
341     if(defined($msg) && $msg ne '')
342     {
343     $self->write($msg);
344     }
345     }
346    
347     $self->SUPER::close();
348     }
349    
350 knops.gerd 37 sub didClose {
351    
352     $::Statistics->{'current_subscribers'}--;
353     }
354    
355 knops.gerd 11 sub checkForMaxTime {
356     my $self=shift;
357     my $time=shift;
358    
359     $self->close(1) if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
360     }
361    
362     1;
363 andrew.betts 3 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26