/[meteor]/googlecode.com/svn/trunk/Meteor/Channel.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /googlecode.com/svn/trunk/Meteor/Channel.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 62 - (hide annotations)
Thu Nov 27 00:33:21 2008 UTC (15 years, 4 months ago) by andrew.betts
File size: 7964 byte(s)
1 Fixed: Added SIGPIPE handler.  We noticed that under heavy load
Meteor receives SIGPIPEs from the OS, suspected to relate to clients
that have just disconnected the moment Meteor attempts to write to the
socket.  This caused Meteor to crash.
2 Fixed: Long polling multiple channels no longer causes the loop to
die and restart when some channels have messages queued for delivery.
3 Fixed: Over time, Meteor 'collected' connections from clients that
never got disconnected even if MaxTime was set.  This happened if the
client concerned sent a header with no terminating blank line.  Meteor
kept waiting for the rest of the header, which never arrived, and
therefore the client remained in limbo, never subjected to the MaxTime
time limit because it had not yet become a subscriber.  Clients are
now allowed 30 seconds to send a valid request header.
4 Fixed: If only one message existed on the server, the JS client
would continue to request it again and again, because it has message
ID 0, and the JS client considered this an invalid message ID.
5 Fixed: Corrected some comments in file headers

6 Changed: MaxMessages has been renamed to CloseOnEvent and functions
in a similar, but not quite identical way.  Thanks to Matthew Haak,
who pointed out the extreme confusingness of MaxMessages and a bug
that has resulted in Fix 2 above.  Setting CloseOnEvent to any value
that evaluates to true will cause Meteor to close subscriber
connections after at least one message has been sent and there are no
further messages pending.  This is identical to MaxMessages for values
of 0 and 1, but where MaxMessages is set to a value higher than one,
replacing it with CloseOnEvent with the same value will act as though
it were set to one.  The intent of MaxMessages was to enable long-
polling (and it is used by the JS client in that way), and
CloseonEvent is a drop in replacement for that behaviour.
7 Changed: Meteor JS client now uses dynamic <SCRIPT> tags for all
polling behaviours, rather than XHR.  This enables it to make poll
requests cross-domain (see 13)
8 Changed: Meteor JS client now abstracts timestamp lookups to a
dedicated method.
9 Changed: Default HeaderTemplates no longer include cache busting
headers, since all meteor requests contain a millisecond timestamp and
so no client makes the same request twice.  These were therefore
simply chewing up bandwidth.
10 Changed: Date strings used for logging debug messages are cached to
avoid numerous expensive lookups to localtime().
11 Changed: Channel info is only sent in a response if the client does
not request a restart from a specified ID.  The logic being that if
the client knows the ID they want to start from, they have already
made previous requests and have the channel information they need.
Bandwidth saving measure.

12 Added: JS client now has a Meteor.isSupportedBrowser() method,
which you can call to detemine whether Meteor will run in the user's
browser version.
13 Added: JS client can now use different hosts for polling and
streaming.  This is only really useful if your website is on a domain
that has a lot of cookies, and you don't want to send them in every
poll request.  Removing cookies from request headers can reduce the
size of the request significantly.  We find that with cookies included
Meteor poll requests are usually larger than the responses.  To use,
set Meteor.pollhost.  Meteor.pollhost can be any domain, while
Meteor.host must be a subdomain of your website hostname.
14 Added: Config file now supports new 'FooterTemplate' parameter, for
a string to send just before the connection to the subscriber is
closed.  This is in support of change 7.
15 Added: Better inline documentation for ChannelInfoTemplate config
parameter
16 Added: Log output includes connection IDs corresponding to the file
inode for each connection
17 Added: New controller command LISTCONNECTIONS, produces a newline
delimited list of all currently connected clients, and for each one
displaying "ConnectionID IPAddress ClientType [SubscriberID]"
18 Added: New controller command DESCRIBE, takes a ConnectionID as a
parameter, and outputs numerous statistics about that particular
client, including number of messages sent/received, user agent, IP
address, time connected, time remaining until MaxTime etc.
19 Added: New controller comment LISTSUBSCRIBERS, produces a newline
delimited list of all currently connected streaming subscribers, and
for each one displaying "SubscriberID IPAddress Starttime TimeLimit
TimeRemaining MessageCount UserAgent"
20 Added: SHOWSTATS command produces the following additional stats:
connection_count: total current connections, real_subscribers: total
of number of currently connected streaming subscribers plus the number
of unique polling connections seen in the last 60 seconds.
21 Added: STDERR outputs prior to every exit() for debugging purposes
22 Added: The UDP server is now considered stable, and is the best way
of broadcasting messages to lots of Meteor nodes simultaneously and
efficiently. 


1 knops.gerd 11 #!/usr/bin/perl -w
2     ###############################################################################
3     # Meteor
4     # An HTTP server for the 2.0 web
5     # Copyright (c) 2006 contributing authors
6     #
7 andrew.betts 62 # Channel.pm
8 knops.gerd 11 #
9     # Description:
10     # A Meteor Channel
11     #
12     ###############################################################################
13     #
14     # This program is free software; you can redistribute it and/or modify it
15     # under the terms of the GNU General Public License as published by the Free
16     # Software Foundation; either version 2 of the License, or (at your option)
17     # any later version.
18     #
19     # This program is distributed in the hope that it will be useful, but WITHOUT
20     # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21     # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22     # more details.
23     #
24     # You should have received a copy of the GNU General Public License along
25     # with this program; if not, write to the Free Software Foundation, Inc.,
26     # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27     #
28     # For more information visit www.meteorserver.org
29     #
30     ###############################################################################
31    
32     package Meteor::Channel;
33     ###############################################################################
34     # Configuration
35     ###############################################################################
36    
37     use strict;
38    
39     use Meteor::Message;
40    
41     our %Channels=();
42     our $MessageID=0;
43    
44     ###############################################################################
45     # Class methods
46     ###############################################################################
47     sub channelWithName {
48     my $class=shift;
49     my $channelName=shift;
50     my $avoidCreation=shift;
51    
52     unless(exists($Channels{$channelName}))
53     {
54     return undef if($avoidCreation);
55     #
56     # Create new channel
57     #
58     $Channels{$channelName}=$class->newChannel($channelName);
59    
60     &::syslog('debug',"New channel $channelName");
61     }
62    
63     return $Channels{$channelName};
64     }
65    
66     sub listChannels {
67     my $class=shift;
68    
69     my $list='';
70     foreach my $channelName (sort keys %Channels)
71     {
72     my $channel=$Channels{$channelName};
73    
74     $list.=$channelName.'('.$channel->messageCount().'/'.$channel->subscriberCount().")$::CRLF";
75     }
76    
77     $list;
78     }
79    
80     sub deleteChannel {
81     my $class=shift;
82     my $channelName=shift;
83    
84     delete($Channels{$channelName});
85     }
86    
87     sub trimMessageStoresByTimestamp {
88     my $class=shift;
89     my $minTimeStamp=shift;
90    
91     return unless($minTimeStamp);
92    
93     map { $_->trimMessageStoreByTimestamp($minTimeStamp) } (values %Channels);
94     }
95    
96     sub clearAllBuffers {
97     my $class=shift;
98    
99     map { $_->clearBuffer() } (values %Channels);
100     }
101    
102 knops.gerd 25 sub numChannels {
103    
104     return scalar(keys %Channels);
105     }
106    
107 knops.gerd 11 ###############################################################################
108     # Factory methods
109     ###############################################################################
110     sub new {
111     #
112     # Create a new empty instance
113     #
114     my $class=shift;
115    
116     my $obj={};
117    
118     bless($obj,$class);
119     }
120    
121     sub newChannel {
122     #
123     # new instance from new server connection
124     #
125     my $self=shift->new();
126    
127     my $name=shift;
128     $self->{'name'}=$name;
129    
130     $self->{'subscribers'}=[];
131     $self->{'messages'}=[];
132    
133     $self;
134     }
135    
136     sub DESTROY {
137     my $self=shift;
138    
139     my @subscribers=@{$self->{'subscribers'}};
140 knops.gerd 13 map { $_->closeChannel($self->{'name'}) } @subscribers;
141 knops.gerd 11 }
142    
143     ###############################################################################
144     # Instance methods
145     ###############################################################################
146     sub name {
147     shift->{'name'};
148     }
149    
150     sub addSubscriber {
151     my $self=shift;
152     my $subscriber=shift;
153     my $startId=shift;
154     my $persist=shift;
155 knops.gerd 47 my $mode=shift || '';
156     my $userAgent=shift || '';
157 knops.gerd 11
158     # Note: negative $startId means go back that many messages
159 andrew.betts 50 my $startIndex=$self->indexForMessageID($startId);
160     my $logStartIndex = $startIndex || $self->lastMsgID() || 0;
161 knops.gerd 11
162     push(@{$self->{'subscribers'}},$subscriber) if($persist);
163    
164 knops.gerd 47 &::syslog('info','',
165     'joinchannel',
166 andrew.betts 62 $subscriber->{'socketFN'},
167 knops.gerd 47 $self->{'name'},
168 andrew.betts 62 $logStartIndex
169 knops.gerd 47 );
170    
171 knops.gerd 11 return unless(defined($startIndex));
172    
173     my $msgCount=scalar(@{$self->{'messages'}});
174     my $txt='';
175    
176     $startIndex=0 if($startIndex<0);
177    
178 andrew.betts 50 if($startIndex<$msgCount) {
179     $subscriber->sendMessages(@{$self->{'messages'}}[$startIndex..$msgCount-1]);
180 knops.gerd 11 }
181     }
182    
183     sub removeSubscriber {
184     my $self=shift;
185     my $subscriber=shift;
186 knops.gerd 47 my $reason=shift ||'unknown';
187 knops.gerd 11
188     my $idx=undef;
189 andrew.betts 50 my $numsubs = scalar(@{$self->{'subscribers'}});
190 andrew.betts 53
191     for (my $i=0; $i<$numsubs; $i++) {
192     if($self->{'subscribers'}->[$i]==$subscriber) {
193 knops.gerd 11 $idx=$i;
194     last;
195     }
196     }
197    
198     if(defined($idx))
199     {
200     splice(@{$self->{'subscribers'}},$idx,1);
201 knops.gerd 47
202 andrew.betts 62 my $timeConnected = time - $subscriber->{'connectionStart'};
203 knops.gerd 47 &::syslog('info','',
204     'leavechannel',
205 andrew.betts 53 $subscriber->{'ip'},
206 knops.gerd 47 $subscriber->{'subscriberID'},
207     $self->{'name'},
208 andrew.betts 50 $timeConnected,
209 andrew.betts 62 $subscriber->{'messageCount'},
210 knops.gerd 47 $subscriber->{'bytesWritten'},
211     $reason
212     );
213 knops.gerd 11 }
214    
215     $self->checkExpiration();
216     }
217    
218     sub subscriberCount {
219     my $self=shift;
220    
221     scalar(@{$self->{'subscribers'}});
222     }
223    
224     sub addMessage {
225     my $self=shift;
226     my $messageText=shift;
227    
228     my $message=Meteor::Message->newWithID($MessageID++);
229 knops.gerd 16 $message->setText($messageText);
230     $message->setChannelName($self->{'name'});
231 knops.gerd 11 push(@{$self->{'messages'}},$message);
232 andrew.betts 50 &::syslog('debug',"New message ".$message->{"id"}." on channel ".$self->{'name'});
233 knops.gerd 11
234     $self->trimMessageStoreBySize();
235    
236 knops.gerd 45 map { $_->sendMessages($message) } @{$self->{'subscribers'}};
237 knops.gerd 46
238     $message;
239 knops.gerd 11 }
240    
241     sub messageCount {
242     my $self=shift;
243    
244     scalar(@{$self->{'messages'}});
245     }
246    
247     sub trimMessageStoreBySize {
248     my $self=shift;
249    
250     my $numMessages=scalar(@{$self->{'messages'}});
251    
252     if($numMessages>$::CONF{'MaxMessagesPerChannel'})
253     {
254     splice(@{$self->{'messages'}},0,-$::CONF{'MaxMessagesPerChannel'});
255     }
256     }
257    
258     sub trimMessageStoreByTimestamp {
259     my $self=shift;
260     my $ts=shift;
261    
262     while(scalar(@{$self->{'messages'}})>0 && $self->{'messages'}->[0]->timestamp()<$ts)
263     {
264     my $msg=shift(@{$self->{'messages'}});
265     }
266    
267     $self->checkExpiration();
268     }
269    
270     sub clearBuffer {
271     my $self=shift;
272    
273     $self->{'messages'}=[];
274    
275     $self->checkExpiration();
276     }
277    
278     sub checkExpiration {
279     my $self=shift;
280    
281     if($self->messageCount()==0 && $self->subscriberCount()==0)
282     {
283     my $name=$self->name();
284     &::syslog('debug',"Channel expired: $name");
285     $self->deleteChannel($name);
286     }
287     }
288    
289     sub indexForMessageID {
290     my $self=shift;
291     my $id=shift;
292    
293     # the messages is always sorted by ID, so we can
294     # use a binary search to find the message.
295     # return undef if there are no messages or the
296     # ID is that of the last message.
297     # Otherwise return the ID of the found message
298     # of if no message with that ID exists the one
299     # with the next higher ID
300     #
301     return undef unless(defined($id));
302    
303 knops.gerd 46 my $numMessages=scalar(@{$self->{'messages'}});
304 knops.gerd 11
305     return undef unless($numMessages);
306     return -1 unless($id ne '');
307    
308     # Note: negative $id means go back that many messages
309     return $numMessages+$id if($id<0);
310    
311     my $low=0;
312     my $high=$numMessages-1;
313     my $mid;
314     my $cond;
315     while($low<=$high)
316     {
317     $mid=($low+$high)>>1;
318     $cond=$id <=> $self->{'messages'}->[$mid]->id();
319     if($cond<0)
320     {
321     $high=$mid-1;
322     }
323     elsif($cond>0)
324     {
325     $low=$mid+1;
326     }
327     else
328     {
329     return $mid;
330     }
331     }
332    
333     return undef if($low>=$numMessages);
334    
335     return $low;
336     }
337    
338 knops.gerd 46 sub lastMsgID {
339     my $self=shift;
340     my $numMessages=scalar(@{$self->{'messages'}});
341 andrew.betts 50 return undef unless($numMessages>0);
342 knops.gerd 46 @{$self->{'messages'}}[-1]->id();
343     }
344    
345 knops.gerd 45 sub descriptionWithTemplate {
346     my $self=shift;
347     my $template=shift;
348    
349 andrew.betts 53 return '' unless(defined($template) && $template ne '');
350    
351 knops.gerd 45 $template=~s/~([a-zA-Z0-9_]*)~/
352 andrew.betts 50 if(!defined($1) || $1 eq '') {
353 knops.gerd 45 '~';
354 andrew.betts 50 } elsif($1 eq 'messageCount') {
355 knops.gerd 45 $self->messageCount();
356 andrew.betts 50 } elsif($1 eq 'subscriberCount') {
357 knops.gerd 45 $self->subscriberCount();
358 andrew.betts 50 } elsif($1 eq 'lastMsgID') {
359 andrew.betts 58 $self->lastMsgID() || 1;
360 andrew.betts 50 } elsif($1 eq 'name') {
361     $self->{'name'};
362     } else {
363 knops.gerd 45 '';
364     }
365     /gex;
366    
367     $template;
368     }
369    
370 knops.gerd 11 1;
371 andrew.betts 3 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26