/[meteor]/googlecode.com/svn/trunk/Meteor/Channel.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Channel.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 9 by andrew.betts, Fri Dec 8 16:52:58 2006 UTC revision 47 by knops.gerd, Mon Feb 4 21:06:42 2008 UTC
# Line 1  Line 1 
1  #!/usr/bin/perl -w  #!/usr/bin/perl -w
2  ###############################################################################  ###############################################################################
3  #   Meteor  #   Meteor
4  #   An HTTP server for the 2.0 web  #   An HTTP server for the 2.0 web
5  #   Copyright (c) 2006 contributing authors  #   Copyright (c) 2006 contributing authors
6  #  #
7  #   Subscriber.pm  #   Subscriber.pm
8  #  #
9  #       Description:  #       Description:
10  #       A Meteor Channel  #       A Meteor Channel
11  #  #
12  ###############################################################################  ###############################################################################
13  #  #
14  #   This program is free software; you can redistribute it and/or modify it  #   This program is free software; you can redistribute it and/or modify it
15  #   under the terms of the GNU General Public License as published by the Free  #   under the terms of the GNU General Public License as published by the Free
16  #   Software Foundation; either version 2 of the License, or (at your option)  #   Software Foundation; either version 2 of the License, or (at your option)
17  #   any later version.  #   any later version.
18  #  #
19  #   This program is distributed in the hope that it will be useful, but WITHOUT  #   This program is distributed in the hope that it will be useful, but WITHOUT
20  #   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or  #   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  #   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for  #   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22  #   more details.  #   more details.
23  #  #
24  #   You should have received a copy of the GNU General Public License along  #   You should have received a copy of the GNU General Public License along
25  #   with this program; if not, write to the Free Software Foundation, Inc.,  #   with this program; if not, write to the Free Software Foundation, Inc.,
26  #   59 Temple Place, Suite 330, Boston, MA 02111-1307 USA  #   59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27  #  #
28  #   For more information visit www.meteorserver.org  #   For more information visit www.meteorserver.org
29  #  #
30  ###############################################################################  ###############################################################################
31    
32  package Meteor::Channel;  package Meteor::Channel;
33  ###############################################################################  ###############################################################################
34  # Configuration  # Configuration
35  ###############################################################################  ###############################################################################
36                    
37          use strict;          use strict;
38                    
39          use Meteor::Message;          use Meteor::Message;
40                    
41          our %Channels=();          our %Channels=();
42          our $MessageID=0;          our $MessageID=0;
43    
44  ###############################################################################  ###############################################################################
45  # Class methods  # Class methods
46  ###############################################################################  ###############################################################################
47  sub channelWithName {  sub channelWithName {
48          my $class=shift;          my $class=shift;
49          my $channelName=shift;          my $channelName=shift;
50          my $avoidCreation=shift;          my $avoidCreation=shift;
51                    
52          unless(exists($Channels{$channelName}))          unless(exists($Channels{$channelName}))
53          {          {
54                  return undef if($avoidCreation);                  return undef if($avoidCreation);
55                  #                  #
56                  # Create new channel                  # Create new channel
57                  #                  #
58                  $Channels{$channelName}=$class->newChannel($channelName);                  $Channels{$channelName}=$class->newChannel($channelName);
59                                    
60                  &::syslog('debug',"New channel $channelName");                  &::syslog('debug',"New channel $channelName");
61          }          }
62                    
63          return $Channels{$channelName};          return $Channels{$channelName};
64  }  }
65    
66  sub listChannels {  sub listChannels {
67          my $class=shift;          my $class=shift;
68                    
69          my $list='';          my $list='';
70          foreach my $channelName (sort keys %Channels)          foreach my $channelName (sort keys %Channels)
71          {          {
72                  my $channel=$Channels{$channelName};                  my $channel=$Channels{$channelName};
73                                    
74                  $list.=$channelName.'('.$channel->messageCount().'/'.$channel->subscriberCount().")$::CRLF";                  $list.=$channelName.'('.$channel->messageCount().'/'.$channel->subscriberCount().")$::CRLF";
75          }          }
76                    
77          $list;          $list;
78  }  }
79    
80  sub deleteChannel {  sub listChannelsUsingTemplate {
81          my $class=shift;          my $class=shift;
82          my $channelName=shift;          my $template=shift;
83                    
84          delete($Channels{$channelName});          return '' unless(defined($template) && $template ne '');
85  }          
86            my $list='';
87  sub trimMessageStoresByTimestamp {          foreach my $channelName (sort keys %Channels)
88          my $class=shift;          {
89          my $minTimeStamp=shift;                  my $channel=$Channels{$channelName};
90                            
91          return unless($minTimeStamp);                  $list.=$channel->descriptionWithTemplate($template);
92                    }
93          map { $_->trimMessageStoreByTimestamp($minTimeStamp) } (values %Channels);          
94  }          $list;
95    }
96  sub clearAllBuffers {  
97          my $class=shift;  sub deleteChannel {
98                    my $class=shift;
99          map { $_->clearBuffer() } (values %Channels);          my $channelName=shift;
100  }          
101            delete($Channels{$channelName});
102  ###############################################################################  }
103  # Factory methods  
104  ###############################################################################  sub trimMessageStoresByTimestamp {
105  sub new {          my $class=shift;
106          #          my $minTimeStamp=shift;
107          # Create a new empty instance          
108          #          return unless($minTimeStamp);
109          my $class=shift;          
110                    map { $_->trimMessageStoreByTimestamp($minTimeStamp) } (values %Channels);
111          my $obj={};  }
112            
113          bless($obj,$class);  sub clearAllBuffers {
114  }          my $class=shift;
115                    
116  sub newChannel {          map { $_->clearBuffer() } (values %Channels);
117          #  }
118          # new instance from new server connection  
119          #  sub numChannels {
120          my $self=shift->new();          
121                    return scalar(keys %Channels);
122          my $name=shift;  }
123          $self->{'name'}=$name;  
124            ###############################################################################
125          $self->{'subscribers'}=[];  # Factory methods
126          $self->{'messages'}=[];  ###############################################################################
127            sub new {
128          $self;          #
129  }          # Create a new empty instance
130            #
131  sub DESTROY {          my $class=shift;
132          my $self=shift;          
133                    my $obj={};
134          my @subscribers=@{$self->{'subscribers'}};          
135          map { $_->close() } @subscribers;          bless($obj,$class);
136  }  }
137            
138  ###############################################################################  sub newChannel {
139  # Instance methods          #
140  ###############################################################################          # new instance from new server connection
141  sub name {          #
142          shift->{'name'};          my $self=shift->new();
143  }          
144            my $name=shift;
145  sub addSubscriber {          $self->{'name'}=$name;
146          my $self=shift;          
147          my $subscriber=shift;          $self->{'subscribers'}=[];
148          my $startId=shift;          $self->{'messages'}=[];
149          my $persist=shift;          
150                    $self;
151          # Note: negative $startId means go back that many messages  }
152            
153          push(@{$self->{'subscribers'}},$subscriber) if($persist);  sub DESTROY {
154                    my $self=shift;
155          my $startIndex=$self->indexForMessageID($startId);          
156          return unless(defined($startIndex));          my @subscribers=@{$self->{'subscribers'}};
157                    map { $_->closeChannel($self->{'name'}) } @subscribers;
158          my $msgCount=scalar(@{$self->{'messages'}});  }
159          my $txt='';  
160            ###############################################################################
161          $startIndex=0 if($startIndex<0);  # Instance methods
162            ###############################################################################
163          while($startIndex<$msgCount)  sub name {
164          {          shift->{'name'};
165                  my $message=$self->{'messages'}->[$startIndex++];  }
166                    
167                  $txt.=$message->message();  sub addSubscriber {
168          }          my $self=shift;
169                    my $subscriber=shift;
170          $subscriber->sendMessage($txt);          my $startId=shift;
171  }          my $persist=shift;
172            my $mode=shift || '';
173  sub removeSubscriber {          my $userAgent=shift || '';
174          my $self=shift;          
175          my $subscriber=shift;          # Note: negative $startId means go back that many messages
176                    
177          my $idx=undef;          push(@{$self->{'subscribers'}},$subscriber) if($persist);
178          for(my $i=0;$i<scalar(@{$self->{'subscribers'}});$i++)          
179          {          &::syslog('info','',
180                  if($self->{'subscribers'}->[$i]==$subscriber)                  'joinchannel',
181                  {                  $subscriber->{'subscriberID'},
182                          $idx=$i;                  $self->{'name'},
183                          last;                  $mode,
184                  }                  $startId,
185          }                  $userAgent
186                    );
187          if(defined($idx))          
188          {          my $startIndex=$self->indexForMessageID($startId);
189                  splice(@{$self->{'subscribers'}},$idx,1);          return unless(defined($startIndex));
190          }          
191                    my $msgCount=scalar(@{$self->{'messages'}});
192          $self->checkExpiration();          my $txt='';
193  }          
194            $startIndex=0 if($startIndex<0);
195  sub subscriberCount {          
196          my $self=shift;          if($startIndex<$msgCount)
197                    {
198          scalar(@{$self->{'subscribers'}});                  $subscriber->sendMessages(@{$self->{'messages'}}[$startIndex,$msgCount-1]);
199  }          }
200    }
201  sub addMessage {  
202          my $self=shift;  sub removeSubscriber {
203          my $messageText=shift;          my $self=shift;
204                    my $subscriber=shift;
205          my $message=Meteor::Message->newWithID($MessageID++);          my $reason=shift ||'unknown';
206          $message->setText($messageText);                  
207          push(@{$self->{'messages'}},$message);          my $idx=undef;
208                    for(my $i=0;$i<scalar(@{$self->{'subscribers'}});$i++)
209          $self->trimMessageStoreBySize();          {
210                            if($self->{'subscribers'}->[$i]==$subscriber)
211          my $text=$message->message();                  {
212          map { $_->sendMessage($text) } @{$self->{'subscribers'}};                          $idx=$i;
213  }                          last;
214                    }
215  sub messageCount {          }
216          my $self=shift;          
217                    if(defined($idx))
218          scalar(@{$self->{'messages'}});          {
219  }                  splice(@{$self->{'subscribers'}},$idx,1);
220                    
221  sub trimMessageStoreBySize {                  &::syslog('info','',
222          my $self=shift;                          'leavechannel',
223                                    $subscriber->{'subscriberID'},
224          my $numMessages=scalar(@{$self->{'messages'}});                          $self->{'name'},
225                                    $subscriber->{'ConnectionStart'},
226          if($numMessages>$::CONF{'MaxMessagesPerChannel'})                          $subscriber->{'MessageCount'},
227          {                          $subscriber->{'bytesWritten'},
228                  splice(@{$self->{'messages'}},0,-$::CONF{'MaxMessagesPerChannel'});                          $reason
229          }                  );
230  }          }
231            
232  sub trimMessageStoreByTimestamp {          $self->checkExpiration();
233          my $self=shift;  }
234          my $ts=shift;  
235            sub subscriberCount {
236          while(scalar(@{$self->{'messages'}})>0 && $self->{'messages'}->[0]->timestamp()<$ts)          my $self=shift;
237          {          
238                  my $msg=shift(@{$self->{'messages'}});          scalar(@{$self->{'subscribers'}});
239          }  }
240            
241          $self->checkExpiration();  sub addMessage {
242  }          my $self=shift;
243            my $messageText=shift;
244  sub clearBuffer {          
245          my $self=shift;          my $message=Meteor::Message->newWithID($MessageID++);
246                    $message->setText($messageText);
247          $self->{'messages'}=[];          $message->setChannelName($self->{'name'});
248                    push(@{$self->{'messages'}},$message);
249          $self->checkExpiration();          
250  }          $self->trimMessageStoreBySize();
251            
252  sub checkExpiration {          map { $_->sendMessages($message) } @{$self->{'subscribers'}};
253          my $self=shift;          
254                    $message;
255          if($self->messageCount()==0 && $self->subscriberCount()==0)  }
256          {  
257                  my $name=$self->name();  sub messageCount {
258                  &::syslog('debug',"Channel expired: $name");          my $self=shift;
259                  $self->deleteChannel($name);          
260          }          scalar(@{$self->{'messages'}});
261  }  }
262    
263  sub indexForMessageID {  sub trimMessageStoreBySize {
264          my $self=shift;          my $self=shift;
265          my $id=shift;          
266                    my $numMessages=scalar(@{$self->{'messages'}});
267          # the messages is always sorted by ID, so we can          
268          # use a binary search to find the message.          if($numMessages>$::CONF{'MaxMessagesPerChannel'})
269          # return undef if there are no messages or the          {
270          # ID is that of the last message.                  splice(@{$self->{'messages'}},0,-$::CONF{'MaxMessagesPerChannel'});
271          # Otherwise return the ID of the found message          }
272          # of if no message with that ID exists the one  }
273          # with the next higher ID  
274          #  sub trimMessageStoreByTimestamp {
275          return undef unless(defined($id));          my $self=shift;
276                    my $ts=shift;
277          my $numMessages=scalar(scalar(@{$self->{'messages'}}));          
278                    while(scalar(@{$self->{'messages'}})>0 && $self->{'messages'}->[0]->timestamp()<$ts)
279          return undef unless($numMessages);          {
280          return -1 unless($id ne '');                  my $msg=shift(@{$self->{'messages'}});
281                    }
282          # Note: negative $id means go back that many messages          
283          return $numMessages+$id if($id<0);          $self->checkExpiration();
284            }
285          my $low=0;  
286          my $high=$numMessages-1;  sub clearBuffer {
287          my $mid;          my $self=shift;
288          my $cond;          
289          while($low<=$high)          $self->{'messages'}=[];
290          {          
291                  $mid=($low+$high)>>1;          $self->checkExpiration();
292                  $cond=$id <=> $self->{'messages'}->[$mid]->id();  }
293                  if($cond<0)  
294                  {  sub checkExpiration {
295                          $high=$mid-1;          my $self=shift;
296                  }          
297                  elsif($cond>0)          if($self->messageCount()==0 && $self->subscriberCount()==0)
298                  {          {
299                          $low=$mid+1;                  my $name=$self->name();
300                  }                  &::syslog('debug',"Channel expired: $name");
301                  else                  $self->deleteChannel($name);
302                  {          }
303                          return $mid;  }
304                  }  
305          }  sub indexForMessageID {
306                    my $self=shift;
307          return undef if($low>=$numMessages);          my $id=shift;
308                    
309          return $low;          # the messages is always sorted by ID, so we can
310  }          # use a binary search to find the message.
311            # return undef if there are no messages or the
312  1;          # ID is that of the last message.
313            # Otherwise return the ID of the found message
314            # of if no message with that ID exists the one
315            # with the next higher ID
316            #
317            return undef unless(defined($id));
318            
319            my $numMessages=scalar(@{$self->{'messages'}});
320            
321            return undef unless($numMessages);
322            return -1 unless($id ne '');
323            
324            # Note: negative $id means go back that many messages
325            return $numMessages+$id if($id<0);
326            
327            my $low=0;
328            my $high=$numMessages-1;
329            my $mid;
330            my $cond;
331            while($low<=$high)
332            {
333                    $mid=($low+$high)>>1;
334                    $cond=$id <=> $self->{'messages'}->[$mid]->id();
335                    if($cond<0)
336                    {
337                            $high=$mid-1;
338                    }
339                    elsif($cond>0)
340                    {
341                            $low=$mid+1;
342                    }
343                    else
344                    {
345                            return $mid;
346                    }
347            }
348            
349            return undef if($low>=$numMessages);
350            
351            return $low;
352    }
353    
354    sub lastMsgID {
355            my $self=shift;
356            
357            my $numMessages=scalar(@{$self->{'messages'}});
358            
359            return 'undefined' unless($numMessages>0);
360            
361            @{$self->{'messages'}}[-1]->id();
362    }
363    
364    sub descriptionWithTemplate {
365            my $self=shift;
366            my $template=shift;
367            
368            $template=~s/~([a-zA-Z0-9_]*)~/
369                    if(!defined($1) || $1 eq '')
370                    {
371                            '~';
372                    }
373                    elsif($1 eq 'messageCount')
374                    {
375                            $self->messageCount();
376                    }
377                    elsif($1 eq 'subscriberCount')
378                    {
379                            $self->subscriberCount();
380                    }
381                    elsif($1 eq 'lastMsgID')
382                    {
383                            $self->lastMsgID();
384                    }
385                    
386                    elsif(exists($self->{$1}))
387                    {
388                            $self->{$1};
389                    }
390                    else
391                    {
392                            '';
393                    }
394            /gex;
395            
396            $template;
397    }
398    
399    1;
400  ############################################################################EOF  ############################################################################EOF

Legend:
Removed from v.9  
changed lines
  Added in v.47

  ViewVC Help
Powered by ViewVC 1.1.26