/[meteor]/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 80 - (hide annotations)
Sun Mar 29 01:28:23 2009 UTC (15 years, 1 month ago) by dpavlin
File size: 10461 byte(s)
- use disk cache for Koha pages
- move sid to item mungling into meteor
1 knops.gerd 11 #!/usr/bin/perl -w
2     ###############################################################################
3     # Meteor
4     # An HTTP server for the 2.0 web
5     # Copyright (c) 2006 contributing authors
6     #
7     # Subscriber.pm
8     #
9     # Description:
10     # A Meteor Subscriber
11     #
12     ###############################################################################
13     #
14     # This program is free software; you can redistribute it and/or modify it
15     # under the terms of the GNU General Public License as published by the Free
16     # Software Foundation; either version 2 of the License, or (at your option)
17     # any later version.
18     #
19     # This program is distributed in the hope that it will be useful, but WITHOUT
20     # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21     # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22     # more details.
23     #
24     # You should have received a copy of the GNU General Public License along
25     # with this program; if not, write to the Free Software Foundation, Inc.,
26     # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27     #
28     # For more information visit www.meteorserver.org
29     #
30     ###############################################################################
31    
32     package Meteor::Subscriber;
33     ###############################################################################
34     # Configuration
35     ###############################################################################
36    
37     use strict;
38    
39     use Meteor::Connection;
40     use Meteor::Channel;
41     use Meteor::Document;
42 dpavlin 70 use Meteor::Koha;
43 knops.gerd 11
44     @Meteor::Subscriber::ISA=qw(Meteor::Connection);
45    
46     our %PersistentConnections=();
47 knops.gerd 25 our $NumAcceptedConnections=0;
48 knops.gerd 37
49 knops.gerd 11
50     ###############################################################################
51     # Factory methods
52     ###############################################################################
53     sub newFromServer {
54     my $class=shift;
55    
56     my $self=$class->SUPER::newFromServer(shift);
57    
58     $self->{'headerBuffer'}='';
59 andrew.betts 62 $self->{'messageCount'}=0;
60 knops.gerd 11
61 knops.gerd 25 $::Statistics->{'current_subscribers'}++;
62     $::Statistics->{'subscriber_connections_accepted'}++;
63    
64 knops.gerd 11 $self;
65     }
66    
67     ###############################################################################
68     # Class methods
69     ###############################################################################
70     sub deleteSubscriberWithID {
71     my $class=shift;
72     my $id=shift;
73    
74     if(exists($PersistentConnections{$id}))
75     {
76 andrew.betts 50 $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
77 knops.gerd 11 }
78     }
79    
80 andrew.betts 62 sub subscriberExists {
81     my $class=shift;
82     my $id=shift;
83    
84     return exists($PersistentConnections{$id});
85     }
86    
87 knops.gerd 11 sub pingPersistentConnections {
88     my $class=shift;
89    
90     my @cons=values %PersistentConnections;
91    
92 knops.gerd 45 map { $_->ping() } @cons;
93 knops.gerd 11 }
94    
95     sub checkPersistentConnectionsForMaxTime {
96     my $class=shift;
97    
98     my $time=time;
99     my @cons=values %PersistentConnections;
100    
101     map { $_->checkForMaxTime($time) } @cons;
102     }
103    
104 knops.gerd 25 sub numSubscribers {
105    
106     return scalar(keys %PersistentConnections);
107     }
108    
109 andrew.betts 62 sub listSubscribers {
110     my $class=shift;
111     my $list='';
112     foreach my $subscriber (keys %PersistentConnections)
113     {
114     my $sub = $PersistentConnections{$subscriber};
115     $list .= $subscriber.' '.$sub->{'ip'}.' '.$sub->{'connectionStart'}.' '.$sub->{'connectionTimeLimit'}.' '.($sub->{'connectionTimeLimit'}-time).' '.$sub->{'messageCount'}.' "'.$sub->{'useragent'}."\"$::CRLF";
116     }
117     $list;
118     }
119    
120 knops.gerd 11 ###############################################################################
121     # Instance methods
122     ###############################################################################
123     sub processLine {
124     my $self=shift;
125     my $line=shift;
126    
127 andrew.betts 62 # Once the header was processed we ignore any input - Meteor does not accept or process request bodies
128 knops.gerd 11 return unless(exists($self->{'headerBuffer'}));
129    
130     if($line ne '')
131     {
132     #
133     # Accumulate header
134     #
135     $self->{'headerBuffer'}.="$line\n";
136     }
137     else
138     {
139     #
140     # Empty line signals end of header.
141     # Analyze header, register with appropiate channel
142     # and send pending messages.
143     #
144 andrew.betts 32 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
145 knops.gerd 11 #
146     # Find the 'GET' line
147     #
148 andrew.betts 56 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i)
149 knops.gerd 11 {
150 andrew.betts 50 $self->{'subscriberID'}=$1;
151 andrew.betts 32 $self->{'mode'}=$2;
152 knops.gerd 45 my $persist=$self->getConf('Persist');
153     my $maxTime=$self->getConf('MaxTime');
154 andrew.betts 62 $self->{'connectionTimeLimit'} = ($self->{'connectionStart'}+$maxTime) if ($maxTime>0);
155 knops.gerd 45
156 andrew.betts 32 my @channelData=split('/',$3);
157 knops.gerd 13 my $channels={};
158 andrew.betts 32 my $channelName;
159     my $offset;
160 andrew.betts 35 foreach my $chandef (@channelData) {
161 andrew.betts 51 if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
162 andrew.betts 32 $channelName = $1;
163     $channels->{$channelName}->{'startIndex'} = undef;
164 andrew.betts 35 if ($3) {
165     $offset = $4;
166     if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
167     if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
168     if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
169 knops.gerd 13 }
170 knops.gerd 11 }
171     }
172 andrew.betts 62 $self->{'useragent'} = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
173 knops.gerd 11
174 andrew.betts 50 if ($persist) {
175 andrew.betts 62 # New persistent connection: kill any existing connection with same ID
176 andrew.betts 50 $self->deleteSubscriberWithID($self->{'subscriberID'});
177 andrew.betts 62 # Add new persistent connection to collection
178 andrew.betts 50 $PersistentConnections{$self->{'subscriberID'}}=$self;
179 andrew.betts 62 } else {
180     $::Pollers->{$self->{'subscriberID'}} = time;
181 knops.gerd 11 }
182    
183 andrew.betts 53 if(scalar(keys %{$channels})) {
184    
185     $self->{'channelinfo'} = '';
186     my $citemplate = $self->getConf('ChannelInfoTemplate');
187     foreach $channelName (keys %{$channels}) {
188     my $channel=Meteor::Channel->channelWithName($channelName);
189     $self->{'channels'}->{$channelName}=$channel;
190 andrew.betts 62 if (defined($self->{'channels'}->{$channelName}->{'startIndex'}) && $self->{'channels'}->{$channelName}->{'startIndex'} > 0) {
191     $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
192     }
193 andrew.betts 53 }
194 knops.gerd 11 $self->emitOKHeader();
195 andrew.betts 53 foreach $channelName (keys %{$channels}) {
196     my $startIndex=$channels->{$channelName}->{'startIndex'};
197 andrew.betts 62 $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$self->{'useragent'});
198 andrew.betts 53 }
199 andrew.betts 64 if (!$persist) {
200     delete ($self->{'channels'});
201     $self->close(1, 'responseComplete');
202     }
203 andrew.betts 62 delete($self->{'headerBuffer'});
204 andrew.betts 64
205     # If long polling, close connection immediately if any messages have been sent
206     if ($self->{'messageCount'} > 0 && $self->{'mode'} eq 'longpoll') {
207     $self->close(1, 'closedOnEvent');
208     }
209 knops.gerd 11 return;
210     }
211     }
212 andrew.betts 32 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
213     {
214     $self->deleteSubscriberWithID($1);
215     $self->emitOKHeader();
216 andrew.betts 50 $self->close(1, 'disconnectRequested');
217 andrew.betts 32 return;
218     }
219 dpavlin 80 elsif($self->{'headerBuffer'}=~m{GET\s+/koha/(\S+)})
220 dpavlin 70 {
221     Meteor::Koha->item($self,$1);
222     $self->SUPER::close();
223     return;
224     }
225 knops.gerd 11 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
226     {
227     Meteor::Document->serveFileToClient($1,$self);
228 andrew.betts 64 $self->SUPER::close();
229 knops.gerd 11 return;
230     }
231    
232     #
233     # If we fall through we did not understand the request
234     #
235     $self->emitErrorHeader();
236     }
237     }
238    
239     sub emitOKHeader {
240     my $self=shift;
241    
242     $self->emitHeader('200 OK');
243     }
244    
245     sub emitErrorHeader {
246     my $self=shift;
247    
248     $self->emitHeader('404 Not Found');
249 knops.gerd 25 $::Statistics->{'errors_served'}++;
250 knops.gerd 11
251     # close up shop here!
252 andrew.betts 50 $self->close(0, 'error');
253 knops.gerd 11 }
254    
255     sub emitHeader {
256     my $self=shift;
257     my $status=shift;
258    
259 knops.gerd 49 my $header=$self->getConf('HeaderTemplate');
260 knops.gerd 11
261 knops.gerd 45 $header=~s/~([^~]*)~/
262 andrew.betts 53 if(!defined($1) || $1 eq '') {
263 knops.gerd 11 '~';
264 andrew.betts 53 } elsif($1 eq 'server') {
265 knops.gerd 11 $::PGM;
266 andrew.betts 53 } elsif($1 eq 'status') {
267 knops.gerd 11 $status;
268 andrew.betts 53 } elsif($1 eq 'servertime') {
269 knops.gerd 11 time;
270 andrew.betts 53 } elsif($1 eq 'channelinfo') {
271     $self->{'channelinfo'};
272     } else {
273 knops.gerd 11 '';
274     }
275     /gex;
276    
277 andrew.betts 39 $self->write($header);
278 knops.gerd 11 }
279    
280 knops.gerd 45 sub sendMessages {
281 knops.gerd 11 my $self=shift;
282    
283 knops.gerd 45 my $numMessages=0;
284 andrew.betts 50 my $msgTemplate=$self->getConf('MessageTemplate');
285 knops.gerd 45 my $msgData='';
286 knops.gerd 25
287 knops.gerd 45 foreach my $message (@_)
288     {
289     $msgData.=$message->messageWithTemplate($msgTemplate);
290     $numMessages++;
291     }
292 knops.gerd 11
293 knops.gerd 45 return if($numMessages<1);
294 knops.gerd 25
295 knops.gerd 45 $self->write($msgData);
296 knops.gerd 11
297 knops.gerd 45 $::Statistics->{'messages_served'}+=$numMessages;
298    
299 andrew.betts 62 my $msgCount=$self->{'messageCount'};
300 knops.gerd 45 $msgCount+=$numMessages;
301 andrew.betts 62 $self->{'messageCount'}=$msgCount;
302 knops.gerd 45
303 andrew.betts 62 # If long polling, close connection, as a message has now been sent.
304     # Don't close if still processing the header (we may be sending a backlog from multiple channels)
305     if($self->getConf('CloseOnEvent') && !exists($self->{'headerBuffer'})) {
306     $self->close(1, 'closedOnEvent');
307 knops.gerd 11 }
308     }
309    
310 knops.gerd 45 sub ping {
311     my $self=shift;
312     my $msg=$self->getConf('PingMessage');
313    
314     $self->write($msg);
315     }
316    
317 knops.gerd 13 sub closeChannel {
318     my $self=shift;
319     my $channelName=shift;
320    
321     return unless(exists($self->{'channels'}->{$channelName}));
322    
323     my $channel=$self->{'channels'}->{$channelName};
324 knops.gerd 47 $channel->removeSubscriber($self,'channelClose');
325 knops.gerd 13
326     delete($self->{'channels'}->{$channelName});
327    
328 andrew.betts 50 $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
329 knops.gerd 13 }
330    
331 knops.gerd 11 sub close {
332     my $self=shift;
333     my $noShutdownMsg=shift;
334 andrew.betts 50 my $reason=shift;
335 knops.gerd 11
336 knops.gerd 13 foreach my $channelName (keys %{$self->{'channels'}})
337     {
338     my $channel=$self->{'channels'}->{$channelName};
339 andrew.betts 50 $channel->removeSubscriber($self,$reason);
340 knops.gerd 13 }
341     delete($self->{'channels'});
342 knops.gerd 11
343 andrew.betts 50 # If this connection is in the PersistentConnections array, delete it, then anonymise
344     # it so that if we have to wait for the write buffer to empty before close, it's only
345     # removed once.
346     if(exists($self->{'subscriberID'})) {
347 knops.gerd 11 delete($PersistentConnections{$self->{'subscriberID'}});
348 andrew.betts 50 delete($self->{'subscriberID'});
349 knops.gerd 11 }
350    
351     # Send shutdown message unless remote closed or
352     # connection not yet established
353     unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
354     {
355 knops.gerd 45 my $msg=$self->getConf('SubscriberShutdownMsg');
356 knops.gerd 11 if(defined($msg) && $msg ne '')
357     {
358     $self->write($msg);
359     }
360     }
361 andrew.betts 62
362     my $fmsg=$self->getConf('FooterTemplate');
363     if(defined($fmsg) && $fmsg ne '')
364     {
365     $self->write($fmsg);
366     }
367    
368 knops.gerd 11 $self->SUPER::close();
369     }
370    
371 knops.gerd 37 sub didClose {
372    
373     $::Statistics->{'current_subscribers'}--;
374     }
375    
376 knops.gerd 11 sub checkForMaxTime {
377     my $self=shift;
378     my $time=shift;
379    
380 andrew.betts 62 $self->close(1,'maxTime') if(exists($self->{'connectionTimeLimit'}) && $self->{'connectionTimeLimit'}<$time);
381 knops.gerd 11 }
382    
383 knops.gerd 45 sub getConf {
384     my $self=shift;
385     my $key=shift;
386    
387 andrew.betts 64 if(exists($self->{'mode'}) && $self->{'mode'} ne '') {
388     my $k=$key.$self->{'mode'};
389 andrew.betts 50 if(exists($::CONF{$k})) {
390     return $::CONF{$k};
391     }
392 knops.gerd 45 }
393    
394     $::CONF{$key};
395     }
396    
397 knops.gerd 11 1;
398 dpavlin 70 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26