/[meteor]/googlecode.com/svn/trunk/Meteor/Channel.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /googlecode.com/svn/trunk/Meteor/Channel.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 9 - (show annotations)
Fri Dec 8 16:52:58 2006 UTC (17 years, 3 months ago) by andrew.betts
File size: 6934 byte(s)


1 #!/usr/bin/perl -w
2 ###############################################################################
3 # Meteor
4 # An HTTP server for the 2.0 web
5 # Copyright (c) 2006 contributing authors
6 #
7 # Subscriber.pm
8 #
9 # Description:
10 # A Meteor Channel
11 #
12 ###############################################################################
13 #
14 # This program is free software; you can redistribute it and/or modify it
15 # under the terms of the GNU General Public License as published by the Free
16 # Software Foundation; either version 2 of the License, or (at your option)
17 # any later version.
18 #
19 # This program is distributed in the hope that it will be useful, but WITHOUT
20 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22 # more details.
23 #
24 # You should have received a copy of the GNU General Public License along
25 # with this program; if not, write to the Free Software Foundation, Inc.,
26 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #
28 # For more information visit www.meteorserver.org
29 #
30 ###############################################################################
31
32 package Meteor::Channel;
33 ###############################################################################
34 # Configuration
35 ###############################################################################
36
37 use strict;
38
39 use Meteor::Message;
40
41 our %Channels=();
42 our $MessageID=0;
43
44 ###############################################################################
45 # Class methods
46 ###############################################################################
47 sub channelWithName {
48 my $class=shift;
49 my $channelName=shift;
50 my $avoidCreation=shift;
51
52 unless(exists($Channels{$channelName}))
53 {
54 return undef if($avoidCreation);
55 #
56 # Create new channel
57 #
58 $Channels{$channelName}=$class->newChannel($channelName);
59
60 &::syslog('debug',"New channel $channelName");
61 }
62
63 return $Channels{$channelName};
64 }
65
66 sub listChannels {
67 my $class=shift;
68
69 my $list='';
70 foreach my $channelName (sort keys %Channels)
71 {
72 my $channel=$Channels{$channelName};
73
74 $list.=$channelName.'('.$channel->messageCount().'/'.$channel->subscriberCount().")$::CRLF";
75 }
76
77 $list;
78 }
79
80 sub deleteChannel {
81 my $class=shift;
82 my $channelName=shift;
83
84 delete($Channels{$channelName});
85 }
86
87 sub trimMessageStoresByTimestamp {
88 my $class=shift;
89 my $minTimeStamp=shift;
90
91 return unless($minTimeStamp);
92
93 map { $_->trimMessageStoreByTimestamp($minTimeStamp) } (values %Channels);
94 }
95
96 sub clearAllBuffers {
97 my $class=shift;
98
99 map { $_->clearBuffer() } (values %Channels);
100 }
101
102 ###############################################################################
103 # Factory methods
104 ###############################################################################
105 sub new {
106 #
107 # Create a new empty instance
108 #
109 my $class=shift;
110
111 my $obj={};
112
113 bless($obj,$class);
114 }
115
116 sub newChannel {
117 #
118 # new instance from new server connection
119 #
120 my $self=shift->new();
121
122 my $name=shift;
123 $self->{'name'}=$name;
124
125 $self->{'subscribers'}=[];
126 $self->{'messages'}=[];
127
128 $self;
129 }
130
131 sub DESTROY {
132 my $self=shift;
133
134 my @subscribers=@{$self->{'subscribers'}};
135 map { $_->close() } @subscribers;
136 }
137
138 ###############################################################################
139 # Instance methods
140 ###############################################################################
141 sub name {
142 shift->{'name'};
143 }
144
145 sub addSubscriber {
146 my $self=shift;
147 my $subscriber=shift;
148 my $startId=shift;
149 my $persist=shift;
150
151 # Note: negative $startId means go back that many messages
152
153 push(@{$self->{'subscribers'}},$subscriber) if($persist);
154
155 my $startIndex=$self->indexForMessageID($startId);
156 return unless(defined($startIndex));
157
158 my $msgCount=scalar(@{$self->{'messages'}});
159 my $txt='';
160
161 $startIndex=0 if($startIndex<0);
162
163 while($startIndex<$msgCount)
164 {
165 my $message=$self->{'messages'}->[$startIndex++];
166
167 $txt.=$message->message();
168 }
169
170 $subscriber->sendMessage($txt);
171 }
172
173 sub removeSubscriber {
174 my $self=shift;
175 my $subscriber=shift;
176
177 my $idx=undef;
178 for(my $i=0;$i<scalar(@{$self->{'subscribers'}});$i++)
179 {
180 if($self->{'subscribers'}->[$i]==$subscriber)
181 {
182 $idx=$i;
183 last;
184 }
185 }
186
187 if(defined($idx))
188 {
189 splice(@{$self->{'subscribers'}},$idx,1);
190 }
191
192 $self->checkExpiration();
193 }
194
195 sub subscriberCount {
196 my $self=shift;
197
198 scalar(@{$self->{'subscribers'}});
199 }
200
201 sub addMessage {
202 my $self=shift;
203 my $messageText=shift;
204
205 my $message=Meteor::Message->newWithID($MessageID++);
206 $message->setText($messageText);
207 push(@{$self->{'messages'}},$message);
208
209 $self->trimMessageStoreBySize();
210
211 my $text=$message->message();
212 map { $_->sendMessage($text) } @{$self->{'subscribers'}};
213 }
214
215 sub messageCount {
216 my $self=shift;
217
218 scalar(@{$self->{'messages'}});
219 }
220
221 sub trimMessageStoreBySize {
222 my $self=shift;
223
224 my $numMessages=scalar(@{$self->{'messages'}});
225
226 if($numMessages>$::CONF{'MaxMessagesPerChannel'})
227 {
228 splice(@{$self->{'messages'}},0,-$::CONF{'MaxMessagesPerChannel'});
229 }
230 }
231
232 sub trimMessageStoreByTimestamp {
233 my $self=shift;
234 my $ts=shift;
235
236 while(scalar(@{$self->{'messages'}})>0 && $self->{'messages'}->[0]->timestamp()<$ts)
237 {
238 my $msg=shift(@{$self->{'messages'}});
239 }
240
241 $self->checkExpiration();
242 }
243
244 sub clearBuffer {
245 my $self=shift;
246
247 $self->{'messages'}=[];
248
249 $self->checkExpiration();
250 }
251
252 sub checkExpiration {
253 my $self=shift;
254
255 if($self->messageCount()==0 && $self->subscriberCount()==0)
256 {
257 my $name=$self->name();
258 &::syslog('debug',"Channel expired: $name");
259 $self->deleteChannel($name);
260 }
261 }
262
263 sub indexForMessageID {
264 my $self=shift;
265 my $id=shift;
266
267 # the messages is always sorted by ID, so we can
268 # use a binary search to find the message.
269 # return undef if there are no messages or the
270 # ID is that of the last message.
271 # Otherwise return the ID of the found message
272 # of if no message with that ID exists the one
273 # with the next higher ID
274 #
275 return undef unless(defined($id));
276
277 my $numMessages=scalar(scalar(@{$self->{'messages'}}));
278
279 return undef unless($numMessages);
280 return -1 unless($id ne '');
281
282 # Note: negative $id means go back that many messages
283 return $numMessages+$id if($id<0);
284
285 my $low=0;
286 my $high=$numMessages-1;
287 my $mid;
288 my $cond;
289 while($low<=$high)
290 {
291 $mid=($low+$high)>>1;
292 $cond=$id <=> $self->{'messages'}->[$mid]->id();
293 if($cond<0)
294 {
295 $high=$mid-1;
296 }
297 elsif($cond>0)
298 {
299 $low=$mid+1;
300 }
301 else
302 {
303 return $mid;
304 }
305 }
306
307 return undef if($low>=$numMessages);
308
309 return $low;
310 }
311
312 1;
313 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26