/[meteor]/trunk/Meteor/Subscriber.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /trunk/Meteor/Subscriber.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 70 - (show annotations)
Sat Mar 28 03:45:31 2009 UTC (15 years ago) by dpavlin
File size: 10461 byte(s)
make Meteor::Koha for integration
1 #!/usr/bin/perl -w
2 ###############################################################################
3 # Meteor
4 # An HTTP server for the 2.0 web
5 # Copyright (c) 2006 contributing authors
6 #
7 # Subscriber.pm
8 #
9 # Description:
10 # A Meteor Subscriber
11 #
12 ###############################################################################
13 #
14 # This program is free software; you can redistribute it and/or modify it
15 # under the terms of the GNU General Public License as published by the Free
16 # Software Foundation; either version 2 of the License, or (at your option)
17 # any later version.
18 #
19 # This program is distributed in the hope that it will be useful, but WITHOUT
20 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22 # more details.
23 #
24 # You should have received a copy of the GNU General Public License along
25 # with this program; if not, write to the Free Software Foundation, Inc.,
26 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #
28 # For more information visit www.meteorserver.org
29 #
30 ###############################################################################
31
32 package Meteor::Subscriber;
33 ###############################################################################
34 # Configuration
35 ###############################################################################
36
37 use strict;
38
39 use Meteor::Connection;
40 use Meteor::Channel;
41 use Meteor::Document;
42 use Meteor::Koha;
43
44 @Meteor::Subscriber::ISA=qw(Meteor::Connection);
45
46 our %PersistentConnections=();
47 our $NumAcceptedConnections=0;
48
49
50 ###############################################################################
51 # Factory methods
52 ###############################################################################
53 sub newFromServer {
54 my $class=shift;
55
56 my $self=$class->SUPER::newFromServer(shift);
57
58 $self->{'headerBuffer'}='';
59 $self->{'messageCount'}=0;
60
61 $::Statistics->{'current_subscribers'}++;
62 $::Statistics->{'subscriber_connections_accepted'}++;
63
64 $self;
65 }
66
67 ###############################################################################
68 # Class methods
69 ###############################################################################
70 sub deleteSubscriberWithID {
71 my $class=shift;
72 my $id=shift;
73
74 if(exists($PersistentConnections{$id}))
75 {
76 $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
77 }
78 }
79
80 sub subscriberExists {
81 my $class=shift;
82 my $id=shift;
83
84 return exists($PersistentConnections{$id});
85 }
86
87 sub pingPersistentConnections {
88 my $class=shift;
89
90 my @cons=values %PersistentConnections;
91
92 map { $_->ping() } @cons;
93 }
94
95 sub checkPersistentConnectionsForMaxTime {
96 my $class=shift;
97
98 my $time=time;
99 my @cons=values %PersistentConnections;
100
101 map { $_->checkForMaxTime($time) } @cons;
102 }
103
104 sub numSubscribers {
105
106 return scalar(keys %PersistentConnections);
107 }
108
109 sub listSubscribers {
110 my $class=shift;
111 my $list='';
112 foreach my $subscriber (keys %PersistentConnections)
113 {
114 my $sub = $PersistentConnections{$subscriber};
115 $list .= $subscriber.' '.$sub->{'ip'}.' '.$sub->{'connectionStart'}.' '.$sub->{'connectionTimeLimit'}.' '.($sub->{'connectionTimeLimit'}-time).' '.$sub->{'messageCount'}.' "'.$sub->{'useragent'}."\"$::CRLF";
116 }
117 $list;
118 }
119
120 ###############################################################################
121 # Instance methods
122 ###############################################################################
123 sub processLine {
124 my $self=shift;
125 my $line=shift;
126
127 # Once the header was processed we ignore any input - Meteor does not accept or process request bodies
128 return unless(exists($self->{'headerBuffer'}));
129
130 if($line ne '')
131 {
132 #
133 # Accumulate header
134 #
135 $self->{'headerBuffer'}.="$line\n";
136 }
137 else
138 {
139 #
140 # Empty line signals end of header.
141 # Analyze header, register with appropiate channel
142 # and send pending messages.
143 #
144 # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
145 #
146 # Find the 'GET' line
147 #
148 if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i)
149 {
150 $self->{'subscriberID'}=$1;
151 $self->{'mode'}=$2;
152 my $persist=$self->getConf('Persist');
153 my $maxTime=$self->getConf('MaxTime');
154 $self->{'connectionTimeLimit'} = ($self->{'connectionStart'}+$maxTime) if ($maxTime>0);
155
156 my @channelData=split('/',$3);
157 my $channels={};
158 my $channelName;
159 my $offset;
160 foreach my $chandef (@channelData) {
161 if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
162 $channelName = $1;
163 $channels->{$channelName}->{'startIndex'} = undef;
164 if ($3) {
165 $offset = $4;
166 if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
167 if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
168 if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
169 }
170 }
171 }
172 $self->{'useragent'} = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
173
174 if ($persist) {
175 # New persistent connection: kill any existing connection with same ID
176 $self->deleteSubscriberWithID($self->{'subscriberID'});
177 # Add new persistent connection to collection
178 $PersistentConnections{$self->{'subscriberID'}}=$self;
179 } else {
180 $::Pollers->{$self->{'subscriberID'}} = time;
181 }
182
183 if(scalar(keys %{$channels})) {
184
185 $self->{'channelinfo'} = '';
186 my $citemplate = $self->getConf('ChannelInfoTemplate');
187 foreach $channelName (keys %{$channels}) {
188 my $channel=Meteor::Channel->channelWithName($channelName);
189 $self->{'channels'}->{$channelName}=$channel;
190 if (defined($self->{'channels'}->{$channelName}->{'startIndex'}) && $self->{'channels'}->{$channelName}->{'startIndex'} > 0) {
191 $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
192 }
193 }
194 $self->emitOKHeader();
195 foreach $channelName (keys %{$channels}) {
196 my $startIndex=$channels->{$channelName}->{'startIndex'};
197 $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$self->{'useragent'});
198 }
199 if (!$persist) {
200 delete ($self->{'channels'});
201 $self->close(1, 'responseComplete');
202 }
203 delete($self->{'headerBuffer'});
204
205 # If long polling, close connection immediately if any messages have been sent
206 if ($self->{'messageCount'} > 0 && $self->{'mode'} eq 'longpoll') {
207 $self->close(1, 'closedOnEvent');
208 }
209 return;
210 }
211 }
212 elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
213 {
214 $self->deleteSubscriberWithID($1);
215 $self->emitOKHeader();
216 $self->close(1, 'disconnectRequested');
217 return;
218 }
219 elsif($self->{'headerBuffer'}=~m{GET\s+/koha/(\d+)})
220 {
221 Meteor::Koha->item($self,$1);
222 $self->SUPER::close();
223 return;
224 }
225 elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
226 {
227 Meteor::Document->serveFileToClient($1,$self);
228 $self->SUPER::close();
229 return;
230 }
231
232 #
233 # If we fall through we did not understand the request
234 #
235 $self->emitErrorHeader();
236 }
237 }
238
239 sub emitOKHeader {
240 my $self=shift;
241
242 $self->emitHeader('200 OK');
243 }
244
245 sub emitErrorHeader {
246 my $self=shift;
247
248 $self->emitHeader('404 Not Found');
249 $::Statistics->{'errors_served'}++;
250
251 # close up shop here!
252 $self->close(0, 'error');
253 }
254
255 sub emitHeader {
256 my $self=shift;
257 my $status=shift;
258
259 my $header=$self->getConf('HeaderTemplate');
260
261 $header=~s/~([^~]*)~/
262 if(!defined($1) || $1 eq '') {
263 '~';
264 } elsif($1 eq 'server') {
265 $::PGM;
266 } elsif($1 eq 'status') {
267 $status;
268 } elsif($1 eq 'servertime') {
269 time;
270 } elsif($1 eq 'channelinfo') {
271 $self->{'channelinfo'};
272 } else {
273 '';
274 }
275 /gex;
276
277 $self->write($header);
278 }
279
280 sub sendMessages {
281 my $self=shift;
282
283 my $numMessages=0;
284 my $msgTemplate=$self->getConf('MessageTemplate');
285 my $msgData='';
286
287 foreach my $message (@_)
288 {
289 $msgData.=$message->messageWithTemplate($msgTemplate);
290 $numMessages++;
291 }
292
293 return if($numMessages<1);
294
295 $self->write($msgData);
296
297 $::Statistics->{'messages_served'}+=$numMessages;
298
299 my $msgCount=$self->{'messageCount'};
300 $msgCount+=$numMessages;
301 $self->{'messageCount'}=$msgCount;
302
303 # If long polling, close connection, as a message has now been sent.
304 # Don't close if still processing the header (we may be sending a backlog from multiple channels)
305 if($self->getConf('CloseOnEvent') && !exists($self->{'headerBuffer'})) {
306 $self->close(1, 'closedOnEvent');
307 }
308 }
309
310 sub ping {
311 my $self=shift;
312 my $msg=$self->getConf('PingMessage');
313
314 $self->write($msg);
315 }
316
317 sub closeChannel {
318 my $self=shift;
319 my $channelName=shift;
320
321 return unless(exists($self->{'channels'}->{$channelName}));
322
323 my $channel=$self->{'channels'}->{$channelName};
324 $channel->removeSubscriber($self,'channelClose');
325
326 delete($self->{'channels'}->{$channelName});
327
328 $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
329 }
330
331 sub close {
332 my $self=shift;
333 my $noShutdownMsg=shift;
334 my $reason=shift;
335
336 foreach my $channelName (keys %{$self->{'channels'}})
337 {
338 my $channel=$self->{'channels'}->{$channelName};
339 $channel->removeSubscriber($self,$reason);
340 }
341 delete($self->{'channels'});
342
343 # If this connection is in the PersistentConnections array, delete it, then anonymise
344 # it so that if we have to wait for the write buffer to empty before close, it's only
345 # removed once.
346 if(exists($self->{'subscriberID'})) {
347 delete($PersistentConnections{$self->{'subscriberID'}});
348 delete($self->{'subscriberID'});
349 }
350
351 # Send shutdown message unless remote closed or
352 # connection not yet established
353 unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
354 {
355 my $msg=$self->getConf('SubscriberShutdownMsg');
356 if(defined($msg) && $msg ne '')
357 {
358 $self->write($msg);
359 }
360 }
361
362 my $fmsg=$self->getConf('FooterTemplate');
363 if(defined($fmsg) && $fmsg ne '')
364 {
365 $self->write($fmsg);
366 }
367
368 $self->SUPER::close();
369 }
370
371 sub didClose {
372
373 $::Statistics->{'current_subscribers'}--;
374 }
375
376 sub checkForMaxTime {
377 my $self=shift;
378 my $time=shift;
379
380 $self->close(1,'maxTime') if(exists($self->{'connectionTimeLimit'}) && $self->{'connectionTimeLimit'}<$time);
381 }
382
383 sub getConf {
384 my $self=shift;
385 my $key=shift;
386
387 if(exists($self->{'mode'}) && $self->{'mode'} ne '') {
388 my $k=$key.$self->{'mode'};
389 if(exists($::CONF{$k})) {
390 return $::CONF{$k};
391 }
392 }
393
394 $::CONF{$key};
395 }
396
397 1;
398 ############################################################################EOF

  ViewVC Help
Powered by ViewVC 1.1.26