/[webpac]/trunk/openisis/lsv.c
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /trunk/openisis/lsv.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 239 - (show annotations)
Mon Mar 8 17:49:13 2004 UTC (20 years ago) by dpavlin
File MIME type: text/plain
File size: 25568 byte(s)
including openisis 0.9.0 into webpac tree

1 /*
2 openisis - an open implementation of the CDS/ISIS database
3 Version 0.8.x (patchlevel see file Version)
4 Copyright (C) 2001-2003 by Erik Grziwotz, erik@openisis.org
5
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2.1 of the License, or (at your option) any later version.
10
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
20 see README for more information
21 EOH */
22
23 /*
24 $Id: lsv.c,v 1.14 2003/06/11 14:53:26 kripke Exp $
25 OpenIsis server
26 */
27
28 #include <sys/select.h>
29 #include <sys/time.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <unistd.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <string.h> /* memcpy et al */
36 #include <signal.h>
37 #include <errno.h>
38
39
40 #include "lsv.h"
41
42
43
44 static Con *con[FD_SETSIZE]; /* usually 1024, but up to 1<<16 */
45 #ifdef LSVTHR
46 static pthread_mutex_t lsv_mutex_init = PTHREAD_MUTEX_INITIALIZER;
47 static pthread_cond_t lsv_cond_init = PTHREAD_COND_INITIALIZER;
48 static pthread_key_t lsv_key_wrk;
49 #else
50 static Wrk *lsv_wrk_single;
51 #endif
52 static int lsv_term;
53 static siginfo_t lsv_sig;
54
55 static const char * pdg[] = {
56 "---",
57 "NEW",
58 "RED",
59 "EOF"
60 };
61 static const char * stg[] = {
62 "NEW",
63 "CON",
64 "INP",
65 "SES",
66 "WRK",
67 "RUN",
68 "COM",
69 "DON",
70 "RET"
71 };
72
73 static void lsv_sighand ( int sig, siginfo_t *info, void *ucontext )
74 {
75 if ( SIGINT == sig || SIGTERM == sig )
76 lsv_term = !0;
77 if ( info )
78 lsv_sig = *info;
79 else { /* ??? */
80 lsv_sig.si_signo = sig;
81 lsv_sig.si_pid = 0;
82 }
83 (void)ucontext;
84 } /* lsv_sighand */
85
86
87 static int lsv_enq ( Que *q, Con *c )
88 {
89 c->nxt = 0;
90 q->len++;
91 if ( q->tail ) {
92 q->tail->nxt = c;
93 q->tail = c;
94 return 1;
95 }
96 q->head = q->tail = c;
97 return 0;
98 } /* lsv_enq */
99
100
101 static void lsv_penq ( Srv *srv, Con *c )
102 {
103 Pool *work =
104 #ifdef LSVTHR
105 c->grp ? &srv->pool :
106 #endif
107 &srv->main;
108 srv->jobs++;
109 c->stg = LSV_STGWRK;
110 if ( c->ses )
111 c->ses->cur = c;
112 lsv_enq( &work->que, c );
113 sMsg( LOG_DEBUG, "enq req %d from %s qlen %d",
114 LIO_FD&c->ios.file, c->nam, work->que.len );
115 #ifdef LSVTHR
116 if ( work->wait ) {
117 Wrk *w = work->wait;
118 work->wait = w->nxt;
119 w->nxt = 0;
120 srv->plen--;
121 pthread_cond_signal( &w->todo );
122 sMsg( LOG_DEBUG, "woke %d of %d", w->id, srv->plen );
123 }
124 #endif
125 } /* lsv_penq */
126
127
128 static Con *lsv_deq ( Que *q )
129 {
130 Con *c = q->head;
131 if ( c ) {
132 q->len--;
133 q->head = c->nxt;
134 c->nxt = 0;
135 if ( q->tail == c )
136 q->tail = 0;
137 }
138 return c;
139 } /* lsv_deq */
140
141
142 Wrk *svCur ()
143 {
144 #ifdef LSVTHR
145 return (Wrk*)pthread_getspecific( lsv_key_wrk );
146 #else
147 return lsv_wrk_single;
148 #endif
149 }
150
151
152 /**
153 in threaded mode, run forever, waiting for jobs.
154 non-threaded, do the jobs that are available and return
155 */
156 static void *lsv_wrk ( void *arg )
157 {
158 Wrk *self = (Wrk*)arg;
159 Srv *srv = self->srv;
160 Con *job = 0;
161 Pool *work =
162 #ifdef LSVTHR
163 self->id ? &srv->pool :
164 #endif
165 &srv->main;
166 #ifdef LSVTHR
167 sigset_t blk;
168
169 /* block signals, they should go to the receiver */
170 sigemptyset( &blk );
171 sigaddset( &blk, SIGHUP );
172 sigaddset( &blk, SIGINT );
173 sigaddset( &blk, SIGTERM );
174 sigaddset( &blk, SIGPIPE );
175 pthread_sigmask( SIG_BLOCK, &blk, 0 );
176
177 pthread_setspecific( lsv_key_wrk, arg );
178 if ( self->id )
179 srv->app( 0, LSV_APPINI );
180 #else
181 lsv_wrk_single = self;
182 if ( !self->ses )
183 #endif
184 self->ses = sGet(); /* allocate session for thread */
185
186 for (;/*ever*/;) {
187 int ret;
188
189 /* ****************************************
190 critical section enter
191 */
192 #ifdef LSVTHR
193 pthread_mutex_lock( &srv->mut );
194 #endif
195 if ( job ) { /* from last turn */
196 sMsg( LOG_DEBUG, "thr %d enq job %s pdg %s",
197 self->id, job->nam, pdg[job->pdg] );
198 job->stg = LSV_STGRET;
199 lsv_enq( &srv->recv, job );
200 #ifdef LSVTHR
201 if ( job->pdg ) /* wakeup receiver */
202 write( srv->pip[1], &job->pdg, 1 );
203 #endif
204 if ( job->ses->que ) { /* immediatly reschedule queued job */
205 Con *q = job->ses->que;
206 job->ses->que = q->nxt;
207 sMsg( LOG_VERBOSE, "dequeing con %s for ses '%s'",
208 q->nam, job->ses->name );
209 lsv_penq( srv, q );
210 }
211 }
212 self->cur = 0;
213 while ( !(job = lsv_deq( &work->que ))
214 && !(LSV_SHUTDN & srv->flg)
215 ) {
216 #ifndef LSVTHR
217 return self;
218 #else
219 self->nxt = work->wait;
220 work->wait = self; /* pls pls lemme wrk */
221 srv->plen++;
222 self->waits++;
223 sMsg( LOG_DEBUG, "thr %d waiting", self->id );
224 /* wait releases the mutex */
225 pthread_cond_wait( &self->todo, &srv->mut );
226 #endif
227 }
228 self->cur = job;
229 if ( job )
230 job->stg = LSV_STGRUN;
231 #ifdef LSVTHR
232 pthread_mutex_unlock( &srv->mut );
233 #endif
234 /* ****************************************
235 critical section leave
236 */
237
238 /* end of monday morning meeting blah blah -- start working */
239 if ( ! job || (LSV_SHUTDN & srv->flg) )
240 break;
241 self->jobs++;
242 if ( job->ses )
243 sSet( job->ses );
244 else
245 job->ses = self->ses;
246 job->ses->io[1] = &job->ios;
247 if ( job->ses->res )
248 CLRREC( job->ses->res );
249 sMsg( LOG_DEBUG, "thr %d run job %s ses '%s'.%d",
250 self->id, job->nam, job->ses->name, job->ses->accnt );
251 ret = srv->app( job, LSV_APPRUN );
252 sMsg( LOG_DEBUG, "thr %d got %d buf %d",
253 self->id, ret, LIO_SAVAIL( &job->ios ) );
254
255 /*
256 lock only in order to create SMP memory barrier.
257 on a single processor system, the byte should be safely readable
258 by receiver without this lock.
259 TOD: check alternatives:
260 since receiver is checking the stage of active connections
261 only in rare cases, it might be more efficient to guard either
262 this set or the processing up to here with a per thread mutex.
263 */
264 #ifdef LSVTHR
265 pthread_mutex_lock( &srv->mut );
266 #endif
267 job->stg = LSV_STGDON;
268 job->ses->cur = 0;
269 #ifdef LSVTHR
270 pthread_mutex_unlock( &srv->mut );
271 #endif
272
273 ret = srv->prt( &job->ios, LIO_SCLOSE );
274 job->ses->io[1] = 0;
275 sMsg( LOG_DEBUG, "thr %d done job %s", self->id, job->nam );
276 if ( job->ses != self->ses )
277 sSet( self->ses ); /* reset session */
278 }
279 #ifndef LSVTHR
280 if ( self->id )
281 srv->app( 0, LSV_APPFIN );
282 #endif
283 return self;
284 } /* lsv_wrk */
285
286
287 static void *lsv_rcv ( void *arg )
288 {
289 Srv *srv = (Srv*)arg;
290 fd_set fd;
291 int fdlen = 0;
292 int pending = 0;
293 int ret = 0, sel, i;
294 #ifdef LSVTHR
295 int inlock = 0;
296 #endif
297
298 FD_ZERO( &fd );
299 FD_SET( srv->lsn, &fd );
300 fdlen = srv->lsn+1;
301 #ifdef LSVTHR
302 FD_SET( srv->pip[0], &fd );
303 if ( fdlen < srv->pip[0]+1 )
304 fdlen = srv->pip[0]+1;
305 #endif
306
307 for (;/*ever*/;) {
308 struct sockaddr_in npeer; /* of new connection */
309 int nsock = -1; /* new socket */
310 Con *c;
311 fd_set ready = fd;
312 Tm expire;
313
314 /* select on read only; writing is done in workers and
315 exceptions (i.e. telnet OOB data) are not used
316 */
317 #ifndef LSVTHR
318 if ( srv->recv.len ) { /* instead of listening on pip */
319 memset( &ready, 0, sizeof(ready) );
320 sel = 0;
321 } else
322 #endif
323 if ( 0 > (sel = select( fdlen, &ready, 0, 0, 0 )) ) {
324 if ( EINTR == errno ) {
325 sMsg( LOG_DEBUG, "select woken by signal" );
326 } else {
327 ret = sMsg( LOG_SYSERR, "select" );
328 goto done;
329 }
330 }
331 if ( lsv_sig.si_signo ) {
332 sMsg( LOG_DEBUG, "SIG%s (%d) from pid %d",
333 SIGHUP==lsv_sig.si_signo ? "HUP" :
334 SIGINT==lsv_sig.si_signo ? "INT" :
335 SIGTERM==lsv_sig.si_signo ? "TERM" :
336 SIGPIPE==lsv_sig.si_signo ? "PIPE" : "?",
337 lsv_sig.si_signo, lsv_sig.si_pid );
338 if ( 1 /*SIGHUP==lsv_sig.si_signo*/ ) {
339 for ( i=FD_SETSIZE; i--; )
340 if ( con[i] ) {
341 c = con[i];
342 sMsg( LOG_INFO,
343 "con %3d peer %s stg %s prt %d app %d ios %d %s (0x%x)",
344 i, c->nam, stg[c->stg], c->prt, c->app,
345 c->ios.pos + c->ios.b.done,
346 LIO_SISOPEN(&c->ios) ? "opn" : "clo", c->ios.file );
347 }
348 }
349 if ( lsv_term ) /* even if it wasn't the last signal delivered */
350 goto done;
351 lsv_sig.si_signo = 0;
352 /* must NOT access the set after interrupt,
353 probably all fds are set
354 */
355 continue;
356 }
357 timeUpd( &srv->tim );
358 timeGtfm( srv->gtm, &srv->tim );
359 expire.millis = srv->tim.millis - srv->sto*LLL(1000);
360
361 #ifdef LSVTHR
362 pthread_mutex_lock( &srv->mut ); /* back in locked land */
363 inlock = !0;
364 #endif
365
366 /* -- we're the acceptor */
367 if ( FD_ISSET( srv->lsn, &ready ) ) { /* new connection */
368 unsigned /*socklen_t is broken*/ alen = sizeof(npeer);
369 nsock = accept( srv->lsn, (struct sockaddr*)&npeer, &alen );
370 if ( 0 > nsock ) {
371 ret = sMsg( LOG_SYSERR, "accept" );
372 goto done;
373 }
374 if ( FD_SETSIZE <= nsock ) {
375 ret = sMsg( ERR_BADF,
376 "socket %d >= FD_SETSIZE %d", nsock, FD_SETSIZE );
377 goto done;
378 }
379 if ( sizeof(npeer) != alen ) {
380 ret = sMsg( ERR_INVAL, "bad peer len %d", alen );
381 goto done;
382 }
383 /* ok */
384 /* setsockopt
385 SO_SNDTIMEO fixed value on linux
386 SO_OOBINLINE ... hmm, don't care
387 SO_LINGER off
388 */
389 /* prepare con */
390 if ( !(c = con[nsock]) ) {
391 c = con[nsock] = mAlloc( sizeof(*c) );
392 c->ios.file = nsock; /* preliminary -- further setup below */
393 }
394 c->pdg = LSV_PDGNEW;
395 switch ( c->stg ) {
396 case LSV_STGCON: /* still in control of receiver */
397 case LSV_STGINP:
398 sMsg( ERR_IDIOT, "bad recv stage %d on new con", c->stg );
399 c->stg = LSV_STGNEW;
400 case LSV_STGNEW: /* clean idle socket -- pass to receiver */
401 lsv_enq( &srv->recv, c );
402 break;
403 case LSV_STGSES: /* still in control of receiver */
404 case LSV_STGWRK:
405 sMsg( ERR_IDIOT, "bad worker stage %d on new con", c->stg );
406 case LSV_STGRUN: /* worker will queue it */
407 /* hmmm ... should have COM ... */
408 case LSV_STGCOM:
409 case LSV_STGDON:
410 case LSV_STGRET: /* worker has queued it */
411 pending++;
412 break;
413 }
414 srv->conn++;
415 FD_CLR( srv->lsn, &ready ); /* skip in fd check */
416 } /* new connection */
417
418 #ifdef LSVTHR
419 if ( FD_ISSET( srv->pip[0], &ready ) ) { /* pending events */
420 char trash[LSV_NUMWRK];
421 read( srv->pip[0], trash, sizeof(trash) );
422 FD_CLR( srv->pip[0], &ready );
423 sel--;
424 }
425 #endif
426
427 /* switching roles -- now we're the receiver */
428 while ( (c = lsv_deq( &srv->recv )) ) { /* care for ready connections */
429 int sock = LIO_FD & c->ios.file;
430 sMsg( LOG_DEBUG, "deq %d from %s pdg %s",
431 sock, c->nam, pdg[c->pdg] );
432 if ( c->pdg && c->stg )
433 pending--;
434 if ( !(LIO_IN & c->ios.file) && (LIO_RDWR & c->ios.file) ) {
435 /* cleanup closed socks */
436 if ( c != con[sock] ) {
437 ret = sMsg( ERR_IDIOT,
438 "deq bad sock %d (0x%x)", sock, c->ios.file );
439 goto done; /* PANIC time */
440 }
441 sMsg( LOG_DEBUG, "clos%s sock %d",
442 (LIO_OUT & c->ios.file) ? "ing" : "ed", sock );
443 if ( (LIO_OUT & c->ios.file) )
444 lio_close( &c->ios.file, LIO_INOUT );
445 c->ios.file &= ~LIO_RDWR; /* clear also, so we're not closing again */
446 FD_CLR( sock, &fd );
447 FD_CLR( sock, &ready ); /* probably an EOF to read */
448 if ( fdlen == sock+1 )
449 while ( fdlen
450 /* && !FD_ISSET( fdlen-1, &fd ) risk of muting tmp. disabled */
451 && (!con[fdlen-1] || !LIO_SISOPEN(&con[fdlen-1]->ios))
452 )
453 fdlen--;
454 }
455 if ( LSV_PDGNEW == c->pdg ) { /* new connection */
456 struct sockaddr_in peer; /* of new connection */
457 if ( nsock == sock ) /* the one accepted just above */
458 peer = npeer;
459 else {
460 unsigned alen = sizeof(peer);
461 if ( getpeername( sock, (struct sockaddr*)&peer, &alen )
462 || sizeof(peer) != alen
463 ) {
464 ret = sMsg( LOG_SYSERR, "getpeername alen %d", alen );
465 goto done;
466 }
467 }
468 c->srv = srv;
469 c->ses = 0;
470 {
471 unsigned host = (unsigned)ntohl( peer.sin_addr.s_addr );
472 unsigned short port = (unsigned short)ntohs( peer.sin_port );
473 char *p = c->nam;
474 c->host = host;
475 memset( c->nam, 0, sizeof(c->nam) );
476 p += u2a( p, 0xff & (host >> 24) ); *p++ = '.';
477 p += u2a( p, 0xff & (host >> 16) ); *p++ = '.';
478 p += u2a( p, 0xff & (host >> 8) ); *p++ = '.';
479 p += u2a( p, 0xff & (host ) ); *p++ = ':';
480 p += u2a( p, port );
481 /* *p++ = '@'; p += u2a( p, sock ); */
482 }
483 c->con++; /* never mind overflow ... */
484 sMsg( LOG_INFO,
485 "connect %d on %d from %s", c->con+1, sock, c->nam );
486 LIO_SINIT( &c->ios, srv->prt, c->nam,
487 sock | LIO_INOUT|LIO_RDWR|LIO_SOCK );
488 LIO_BINIT( &c->flt );
489 FD_SET( sock, &fd );
490 if ( fdlen < sock+1 )
491 fdlen = sock+1;
492 c->bin = 0; /* do not reset on every request */
493 }
494 if ( ! (LIO_IN & c->ios.file) )
495 c->stg = LSV_STGNEW;
496 else {
497 c->stg = LSV_STGCON;
498 if ( ! FD_ISSET( sock, &fd ) ) {
499 sMsg( LOG_DEBUG, "reenabling %s", c->nam );
500 FD_SET( sock, &fd );
501 if ( fdlen < sock+1 )
502 fdlen = sock+1;
503 }
504 }
505 c->grp = c->pdg = c->prt = c->app = 0;
506 /* prepare for new request */
507 c->ios.pos = 0;
508 c->ios.b.done = c->ios.b.fill = 0;
509 if ( c->req )
510 CLRREC( c->req );
511 else {
512 /* this is done in the receiver rather than the acceptor
513 since in multi-receiver model, receiver owns the req
514 */
515 c->req = mAlloc( LSV_BUFSIZ );
516 OPENISIS_INITREC( c->req, LSV_BUFSIZ, 64 );
517 }
518 if ( !(LSV_CONSES & srv->flg) )
519 c->ses = 0;
520 } /* prepare connections */
521
522
523 for ( i=fdlen; sel && i--; ) { /* read from the ready */
524 Field *sid;
525 int got;
526 if ( ! FD_ISSET( i, &ready ) )
527 continue;
528 sel--;
529 c = con[i];
530 if ( ! c ) {
531 ret = sMsg( ERR_IDIOT, "ran on empty con %d", i );
532 goto done;
533 }
534 if ( LSV_STGINP < c->stg ) {
535 sMsg( LOG_DEBUG, "con %d busy stage %d!", i, c->stg );
536 c->pdg = LSV_PDGRED;
537 pending++;
538 FD_CLR( i, &fd ); /* mute this */
539 continue;
540 }
541 c->ios.b.done = 0;
542 got = lio_read( &c->ios.file, c->ios.b.c, sizeof(c->ios.b.c) );
543 if ( 0 >= got ) {
544 if ( -ERR_EOF != got )
545 goto err;
546 sMsg( LOG_DEBUG, "EOF on 0x%x line %d from %s",
547 c->ios.file, c->req->len, c->nam );
548 FD_CLR( i, &fd );
549 if ( ! c->req->len ) /* initial eof */
550 goto eof; /* close immediatly */
551 /* lio_close( &c->ios.file, LIO_IN ); */
552 c->ios.file &= ~LIO_IN; /* don't shut */
553 c->pdg = LSV_PDGEOF; /* so it will be dequed and closed */
554 pending++;
555 got = 0; /* tell proto to close ... */
556 }
557 if ( ! c->req->len ) { /* stage should be CON */
558 RADDS( c->req, -LSV_PEER, c->nam, 1 );
559 sMsg( LOG_DEBUG, "req on %d from %s",
560 LIO_FD&c->ios.file, c->nam );
561 }
562 c->stg = LSV_STGINP;
563 c->ios.b.fill = got;
564 got = srv->prt( &c->ios, LIO_SPUSH );
565 sMsg( LOG_DEBUG, "got 0x%x from %s", got, c->nam );
566 if ( ! got && (LIO_IN & c->ios.file) ) /* ok: more next time */
567 continue;
568 if ( 0 > got ) /* nok */
569 goto err;
570 got = c->srv->app( c, LSV_APPGOT );
571 if ( 0 > got ) /* nok */
572 goto err;
573 if ( !(LSV_CONSES & srv->flg)
574 && c->req && (sid = rGet( c->req, -LSV_SID, 0 ))
575 ) {
576 c->ses = cSesByName( (char*)sid->val, sid->len, &srv->tim, &expire );
577 if ( ! c->ses ) { /* sad sad sad */
578 sMsg( LOG_ERROR, "out of sessions" );
579 goto err; /* drop connection */
580 }
581 if ( ! c->ses->accnt
582 && (! c->ses->prop || ! c->ses->prop->len) /* overflow paranoia */
583 )
584 got = c->srv->app( c, LSV_APPSES );
585 if ( 0 > got ) /* nok */
586 goto err;
587 }
588 c->tim = srv->tim;
589 RADDS( c->req, -LSV_TIME, srv->gtm, 1 );
590 if ( c->ses ) {
591 sMsg( LOG_DEBUG, "req %s for ses '%s' %s",
592 c->nam, c->ses->name, c->ses->cur ? "busy" : "idle" );
593 c->ses->accnt++;
594 if ( c->ses->cur || c->ses->que ) { /* oh - busy !? */
595 Con **cpp = &c->ses->que;
596 int l = 0;
597 for ( ; *cpp; cpp = &(*cpp)->nxt )
598 if ( 4 < ++l ) {
599 sMsg( LOG_WARN,
600 "too many con queing on ses '%s' - dropping %s",
601 c->ses->name, c->nam );
602 goto err;
603 }
604 sMsg( LOG_VERBOSE, "queing con %s for ses '%s'",
605 c->nam, c->ses->name );
606 c->nxt = 0;
607 *cpp = c;
608 c->stg = LSV_STGSES;
609 continue;
610 }
611 }
612 /* yo - got it */
613 lsv_penq( srv, c );
614 continue;
615 err:
616 sMsg( ERR_BADF, "err 0x%x from %s", got, c->nam );
617 eof:
618 lio_close( &c->ios.file, LIO_INOUT );
619 c->stg = LSV_STGNEW;
620 FD_CLR( i, &fd );
621 } /* read from the ready */
622
623 srv->turn++;
624 srv->busy += srv->nwr - srv->plen;
625 #ifdef LSVTHR
626 srv->wlen += srv->pool.que.len;
627
628 sMsg( LOG_INFO,
629 "\n==== %s\nturn %d: %d jobs for %d idle workers %d pending",
630 srv->gtm, srv->turn, srv->pool.que.len, srv->plen, pending );
631 for ( i=srv->nwr; i--; )
632 if ( srv->wrk[i].cur )
633 sMsg( LOG_INFO, "thr %d serving %d from %s",
634 i, LIO_FD&srv->wrk[i].cur->ios.file, srv->wrk[i].cur->nam );
635
636 inlock = 0;
637 pthread_mutex_unlock( &srv->mut );
638 #else
639 lsv_wrk( srv->wrk );
640 #endif
641 } /* for(ever) */
642 done:
643 #ifdef LSVTHR
644 if ( inlock )
645 pthread_mutex_unlock( &srv->mut );
646 srv->flg |= LSV_SHUTDN;
647 for ( i=srv->nwr; i--; )
648 pthread_cond_signal( &srv->wrk[i].todo );
649 #endif
650 return srv;
651 } /* lsv_rcv */
652
653
654 int svRun ( Srv *srv, const char *addr )
655 {
656 static const int yes = !0;
657 struct sigaction siga;
658 struct sockaddr_in sa; /* server address */
659 int ret = 0, i;
660
661 /* assume we're the main-thread
662 -- try to make sure we get the controlling session
663 */
664 sGet();
665
666 /* set defaults */
667 if ( ! srv->sto )
668 srv->sto = 30*60; /* 30 minutes session timeout */
669 #ifndef LSVTHR
670 srv->nwr = 1;
671 #else
672 if ( ! srv->nwr )
673 srv->nwr = LSV_NUMWRK/2; /* half the max */
674 else if ( LSV_NUMWRK < srv->nwr )
675 srv->nwr = LSV_NUMWRK; /* max */
676
677 srv->mut = lsv_mutex_init;
678 #endif
679 /* prepare signals */
680 memset( &siga, 0, sizeof(siga) );
681 siga.sa_sigaction = lsv_sighand;
682 sigemptyset( &siga.sa_mask );
683 sigaddset( &siga.sa_mask, SIGHUP );
684 sigaddset( &siga.sa_mask, SIGINT );
685 sigaddset( &siga.sa_mask, SIGTERM );
686 sigaddset( &siga.sa_mask, SIGPIPE );
687 siga.sa_flags = SA_SIGINFO;
688 sigaction( SIGHUP, &siga, 0 );
689 sigaction( SIGINT, &siga, 0 );
690 sigaction( SIGTERM, &siga, 0 );
691 sigaction( SIGPIPE, &siga, 0 );
692
693 /* prepare socket */
694 srv->lsn = socket( PF_INET, SOCK_STREAM, IPPROTO_TCP/*6*/ );
695 /* fcntl( srv->lsn, O_NONBLOCK ); not needed */
696 sa.sin_family = AF_INET;
697 sa.sin_port = htons( addr ? a2i( addr, -1 ) : 8080 );
698 sa.sin_addr.s_addr =
699 #ifdef LSV_LOCALHOST_ONLY
700 htonl(INADDR_LOOPBACK);
701 #else
702 INADDR_ANY;
703 #endif
704 if ( setsockopt( srv->lsn, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes) ) ) {
705 ret = sMsg( LOG_SYSERR, "setsockopt" );
706 goto inierr;
707 }
708 if ( bind( srv->lsn, (struct sockaddr*)&sa, sizeof(sa) ) ) {
709 ret = sMsg( LOG_SYSERR, "bind on port %u", htons(sa.sin_port) );
710 goto inierr;
711 }
712 if ( listen( srv->lsn, 64/*backlog*/ ) ) {
713 ret = sMsg( LOG_SYSERR, "listen" );
714 goto inierr;
715 }
716 #ifdef LSVTHR
717 /* prepare pending event pipe */
718 if ( pipe( srv->pip ) ) {
719 ret = sMsg( LOG_SYSERR, "pipe" );
720 goto inierr;
721 }
722
723 if ( pthread_key_create( &lsv_key_wrk, 0 ) ) {
724 ret = sMsg( LOG_SYSERR, "key_create" );
725 goto inierr;
726 }
727 #endif
728 /* setup workers */
729 memset( srv->wrk, 0, sizeof(srv->wrk) );
730 for ( i=srv->nwr; i--; ) {
731 srv->wrk[i].id = i;
732 srv->wrk[i].srv = srv;
733 srv->wrk[i].nxt = 0;
734 srv->wrk[i].ses = cSession(0);
735 memcpy( srv->wrk[i].ses->name, "wrk", 3 );
736 u2a( srv->wrk[i].ses->name+3, i );
737 #ifdef LSVTHR
738 srv->wrk[i].todo = lsv_cond_init;
739 if ( i )
740 pthread_create( &srv->wrk[i].thr, 0, lsv_wrk, srv->wrk+i );
741 #endif
742 }
743 #ifndef LSVTHR
744 lsv_rcv( srv );
745 #else
746 /* start the server thread */
747 pthread_create( &srv->rcv, 0, lsv_rcv, srv );
748 /* run main worker */
749 srv->wrk[0].thr = pthread_self();
750 lsv_wrk( srv->wrk );
751
752 /* cleanup */
753 pthread_join( srv->rcv, 0 );
754 for ( i=srv->nwr; i--; )
755 pthread_join( srv->wrk[i].thr, 0 );
756 #endif
757 for ( i=srv->nwr; i--; )
758 sMsg( LOG_INFO, "worker %d had %d jobs %d waits",
759 i, srv->wrk[i].jobs, srv->wrk[i].waits );
760 sMsg( LOG_INFO,
761 "server had %d jobs on %d connections %d turns"
762 " avg queue len %.02f workers %.02f",
763 srv->jobs, srv->conn, srv->turn,
764 srv->wlen/srv->turn, srv->busy/srv->turn );
765 inierr:
766 close( srv->lsn );
767 return ret;
768 } /* svRun */
769
770
771 /** helper, probably should go to lio.
772 cat l bytes from c to buf b flushing as needed.
773 if ! c, empty buf b.
774 */
775 int lio_sout ( Ios *s, Buf *b, const char *c, unsigned l )
776 {
777 if ( ! b )
778 b = &s->b;
779 do {
780 if ( LIO_BUFSIZ/2 < b->fill && b->done ) { /* purge */
781 if ( b->done < b->fill )
782 memmove( b->c, b->c + b->done, b->fill - b->done );
783 s->pos += b->done;
784 b->fill -= b->done;
785 b->done = 0;
786 }
787 if ( c && l && b->fill < LIO_BUFSIZ ) {
788 unsigned cp = LIO_BUFSIZ - b->fill;
789 if ( cp > l )
790 cp = l;
791 memcpy( b->c + b->fill, c, cp );
792 b->fill += cp;
793 c += cp;
794 l -= cp;
795 if ( !l )
796 return 0;
797 }
798 if ( b->fill > b->done ) {
799 int wr = lio_write( &s->file, b->c + b->done, b->fill - b->done );
800 if ( 0 > wr )
801 return wr;
802 if ( ! wr )
803 return -ERR_BUSY;
804 b->done += wr;
805 if ( b->done > b->fill ) {
806 b->done = b->fill = 0;
807 return -ERR_IDIOT;
808 }
809 }
810 if ( b->done >= b->fill ) {
811 s->pos += b->done;
812 b->done = b->fill = 0;
813 }
814 } while ( c ? l : (unsigned)(b->fill > b->done) );
815 return 0;
816 } /* lio_sout */
817
818
819
820 #define LF 10 /* LineFeed a.k.a. newline - '\n' isn't really well defined */
821 #define TAB 9 /* horizontal, that is */
822 #define VT 11 /* vertical, used as newline replacement */
823 static const char lf = LF;
824 static const char tab = TAB;
825 static const char vt = VT;
826
827 /* the plain protocol */
828 int svPlain ( Ios *s, int op )
829 {
830 Field *f;
831 Con *c = (Con*)s;
832 switch ( op ) {
833 case LIO_SPUSH: {
834 int l = s->b.fill - s->b.done;
835 unsigned char *b = s->b.c + s->b.done;
836 unsigned char *end = b+l, *v, *p;
837 if ( ! l ) { /* EOF: done */
838 if ( ! c->prt ) /* ok */
839 return 1;
840 /* last field wasn't closed by LF */
841 return sMsg( ERR_INVAL, "no EOL from %s", c->nam );
842 }
843 if ( c->prt )
844 RSPACE( c->req, l, !0 );
845 /* add text lines */
846 while ( b<end ) {
847 int conti = 0;
848 switch ( c->prt ) {
849 case 0: /* at beginning of line -- start new field */
850 if ( LF == *b ) /* empty line */
851 return 1;
852 if ( TAB != *b || !c->req->len )
853 RADD( c->req, 0,0,end-b, !0 );
854 else { /* binary mode continuation line */
855 conti = 1;
856 if ( ! c->bin ) {
857 sMsg( LOG_INFO,
858 "detected binary mode on con %s", c->nam );
859 c->bin = 1;
860 }
861 RSPACE( c->req, end-b, !0 );
862 }
863 if ( ! c->req )
864 return -ERR_NOMEM;
865 c->prt = 1;
866 case 1: /* add to last field */
867 f = c->req->field + c->req->len-1;
868 v = (unsigned char*)f->val;
869 p = v + f->len;
870 if ( conti ) {
871 *p++ = LF;
872 b++;
873 }
874 if ( c->bin ) {
875 for ( ; b<end && LF != (*p = *b++); p++ )
876 ;
877 } else
878 for ( ; b<end && LF != (*p = *b++); p++ )
879 if ( VT == *p ) /* convert VTABs */
880 *p = LF; /* back to newlines */
881 c->req->used += (p - v) - f->len;
882 f->len = p - v;
883 if ( LF == b[-1] ) {
884 int ret = a2il( f->val, f->len, &f->tag );
885 if ( ret ) {
886 if ( ret < f->len && TAB == v[ret] )
887 ret++;
888 if ( ret < f->len )
889 memmove( v, v+ret, f->len - ret );
890 f->len -= ret;
891 }
892 ret = c->srv->app( c, LSV_APPARG );
893 c->prt = 0;
894 if ( ret )
895 return ret;
896 }
897 sMsg( LOG_INFO, "prs from %s: [%2d] %3d = '%.*s'",
898 c->nam, c->req->len-1, f->tag, f->len, f->val );
899 }
900 }
901 return 0;
902 } /* case LIO_SPUSH */
903 case LIO_SFLUSH:
904 if ( s->b.fill <= s->b.done )
905 return 0;
906 case LIO_SCLOSE:
907 if ( !(LIO_OUT & s->file) )
908 return -ERR_INVAL;
909 /*
910 this switch is not protected, since receiver doesn't care
911 */
912 if ( c->stg < LSV_STGCOM )
913 c->stg = LSV_STGCOM;
914 if ( ! s->pos+s->b.done && c->ses->res ) {
915 char num[32];
916 int n = c->ses->res->len;
917 sMsg( LOG_INFO, "com %d fields to %s", c->ses->res->len, c->nam );
918 for ( f = c->ses->res->field; n--; f++ ) {
919 const char *v = f->val, *nl;
920 int vl = f->len;
921 int nlen = i2a( num, f->tag );
922 num[nlen++] = TAB;
923 lio_sout( s, &c->flt, num, nlen );
924 while ( vl && (nl = memchr(v,'\n',vl)) ) {
925 int ll = 1+(nl-v);
926 if ( c->bin ) {
927 lio_sout( s, &c->flt, v, ll );
928 lio_sout( s, &c->flt, &tab, 1 );
929 } else {
930 lio_sout( s, &c->flt, v, ll-1 );
931 lio_sout( s, &c->flt, &vt, 1 );
932 }
933 v += ll;
934 vl -= ll;
935 }
936 if ( vl )
937 lio_sout( s, &c->flt, v, vl );
938 lio_sout( s, &c->flt, &lf, 1 );
939 }
940 }
941 if ( s->b.fill )
942 s->b.done = s->b.fill = 0;
943 /*
944 {
945 lio_sout( s, &c->flt, 0, 0 );
946 lio_stdio( s, LIO_SFLUSH );
947 }
948 */
949 if ( LIO_SCLOSE == op )
950 lio_sout( s, &c->flt, &lf, 1 );
951 lio_sout( s, &c->flt, 0, 0 ); /* finally flush */
952 return 0;
953 }
954 return lio_stream( s, op ); /* hmm -- SPURGE ? */
955 } /* svPlain */
956
957
958 /* the echo application */
959 int svEcho ( Con *c, int task )
960 {
961 #ifdef LSVECHO_DELAY
962 static Tm tm = { LLL(50) };
963 #endif
964 Field *f;
965 int i;
966 if ( task ) { /* preparing: we don't care */
967 if ( LSV_APPGOT == task )
968 c->grp = 1; /* don't require mainthead */
969 return 0;
970 }
971 /* run it */
972 sMsg( LOG_INFO, "echo %d: %d fields to %s",
973 svCur()->id, c->req->len, c->nam );
974 #ifdef LSVECHO_DELAY
975 timeSleep( &tm );
976 #endif
977 for ( i=c->req->len, f = c->req->field; i--; f++ ) {
978 RADDF( c->ses->res, f, !0 );
979 /*
980 plain protocol actually does not support plain output ...
981 sMsg( c->ses, 0, "%d = '%.*s'\n", f->tag, f->len, f->val );
982 */
983 }
984 return 0;
985 } /* svEcho */
986
987 #if 0
988 int main ( int argc, const char **argv )
989 {
990 Srv echo;
991
992 memset( &echo, 0, sizeof(echo) );
993 echo.prt = lsv_plain;
994 echo.app = lsv_echo;
995 cOpen(0);
996 cLog( LOG_DEBUG, 0 );
997 return svRun( &echo, 1<argc ? argv[1] : 0 );
998 }
999 #endif

  ViewVC Help
Powered by ViewVC 1.1.26