/[webpac]/trunk/openisis/lsv.c
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /trunk/openisis/lsv.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 239 - (hide annotations)
Mon Mar 8 17:49:13 2004 UTC (20 years, 1 month ago) by dpavlin
File MIME type: text/plain
File size: 25568 byte(s)
including openisis 0.9.0 into webpac tree

1 dpavlin 237 /*
2     openisis - an open implementation of the CDS/ISIS database
3     Version 0.8.x (patchlevel see file Version)
4     Copyright (C) 2001-2003 by Erik Grziwotz, erik@openisis.org
5    
6     This library is free software; you can redistribute it and/or
7     modify it under the terms of the GNU Lesser General Public
8     License as published by the Free Software Foundation; either
9     version 2.1 of the License, or (at your option) any later version.
10    
11     This library is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14     Lesser General Public License for more details.
15    
16     You should have received a copy of the GNU Lesser General Public
17     License along with this library; if not, write to the Free Software
18     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19    
20     see README for more information
21     EOH */
22    
23     /*
24     $Id: lsv.c,v 1.14 2003/06/11 14:53:26 kripke Exp $
25     OpenIsis server
26     */
27    
28     #include <sys/select.h>
29     #include <sys/time.h>
30     #include <sys/types.h>
31     #include <sys/socket.h>
32     #include <unistd.h>
33     #include <fcntl.h>
34     #include <netinet/in.h>
35     #include <string.h> /* memcpy et al */
36     #include <signal.h>
37     #include <errno.h>
38    
39    
40     #include "lsv.h"
41    
42    
43    
44     static Con *con[FD_SETSIZE]; /* usually 1024, but up to 1<<16 */
45     #ifdef LSVTHR
46     static pthread_mutex_t lsv_mutex_init = PTHREAD_MUTEX_INITIALIZER;
47     static pthread_cond_t lsv_cond_init = PTHREAD_COND_INITIALIZER;
48     static pthread_key_t lsv_key_wrk;
49     #else
50     static Wrk *lsv_wrk_single;
51     #endif
52     static int lsv_term;
53     static siginfo_t lsv_sig;
54    
55     static const char * pdg[] = {
56     "---",
57     "NEW",
58     "RED",
59     "EOF"
60     };
61     static const char * stg[] = {
62     "NEW",
63     "CON",
64     "INP",
65     "SES",
66     "WRK",
67     "RUN",
68     "COM",
69     "DON",
70     "RET"
71     };
72    
73     static void lsv_sighand ( int sig, siginfo_t *info, void *ucontext )
74     {
75     if ( SIGINT == sig || SIGTERM == sig )
76     lsv_term = !0;
77     if ( info )
78     lsv_sig = *info;
79     else { /* ??? */
80     lsv_sig.si_signo = sig;
81     lsv_sig.si_pid = 0;
82     }
83     (void)ucontext;
84     } /* lsv_sighand */
85    
86    
87     static int lsv_enq ( Que *q, Con *c )
88     {
89     c->nxt = 0;
90     q->len++;
91     if ( q->tail ) {
92     q->tail->nxt = c;
93     q->tail = c;
94     return 1;
95     }
96     q->head = q->tail = c;
97     return 0;
98     } /* lsv_enq */
99    
100    
101     static void lsv_penq ( Srv *srv, Con *c )
102     {
103     Pool *work =
104     #ifdef LSVTHR
105     c->grp ? &srv->pool :
106     #endif
107     &srv->main;
108     srv->jobs++;
109     c->stg = LSV_STGWRK;
110     if ( c->ses )
111     c->ses->cur = c;
112     lsv_enq( &work->que, c );
113     sMsg( LOG_DEBUG, "enq req %d from %s qlen %d",
114     LIO_FD&c->ios.file, c->nam, work->que.len );
115     #ifdef LSVTHR
116     if ( work->wait ) {
117     Wrk *w = work->wait;
118     work->wait = w->nxt;
119     w->nxt = 0;
120     srv->plen--;
121     pthread_cond_signal( &w->todo );
122     sMsg( LOG_DEBUG, "woke %d of %d", w->id, srv->plen );
123     }
124     #endif
125     } /* lsv_penq */
126    
127    
128     static Con *lsv_deq ( Que *q )
129     {
130     Con *c = q->head;
131     if ( c ) {
132     q->len--;
133     q->head = c->nxt;
134     c->nxt = 0;
135     if ( q->tail == c )
136     q->tail = 0;
137     }
138     return c;
139     } /* lsv_deq */
140    
141    
142     Wrk *svCur ()
143     {
144     #ifdef LSVTHR
145     return (Wrk*)pthread_getspecific( lsv_key_wrk );
146     #else
147     return lsv_wrk_single;
148     #endif
149     }
150    
151    
152     /**
153     in threaded mode, run forever, waiting for jobs.
154     non-threaded, do the jobs that are available and return
155     */
156     static void *lsv_wrk ( void *arg )
157     {
158     Wrk *self = (Wrk*)arg;
159     Srv *srv = self->srv;
160     Con *job = 0;
161     Pool *work =
162     #ifdef LSVTHR
163     self->id ? &srv->pool :
164     #endif
165     &srv->main;
166     #ifdef LSVTHR
167     sigset_t blk;
168    
169     /* block signals, they should go to the receiver */
170     sigemptyset( &blk );
171     sigaddset( &blk, SIGHUP );
172     sigaddset( &blk, SIGINT );
173     sigaddset( &blk, SIGTERM );
174     sigaddset( &blk, SIGPIPE );
175     pthread_sigmask( SIG_BLOCK, &blk, 0 );
176    
177     pthread_setspecific( lsv_key_wrk, arg );
178     if ( self->id )
179     srv->app( 0, LSV_APPINI );
180     #else
181     lsv_wrk_single = self;
182     if ( !self->ses )
183     #endif
184     self->ses = sGet(); /* allocate session for thread */
185    
186     for (;/*ever*/;) {
187     int ret;
188    
189     /* ****************************************
190     critical section enter
191     */
192     #ifdef LSVTHR
193     pthread_mutex_lock( &srv->mut );
194     #endif
195     if ( job ) { /* from last turn */
196     sMsg( LOG_DEBUG, "thr %d enq job %s pdg %s",
197     self->id, job->nam, pdg[job->pdg] );
198     job->stg = LSV_STGRET;
199     lsv_enq( &srv->recv, job );
200     #ifdef LSVTHR
201     if ( job->pdg ) /* wakeup receiver */
202     write( srv->pip[1], &job->pdg, 1 );
203     #endif
204     if ( job->ses->que ) { /* immediatly reschedule queued job */
205     Con *q = job->ses->que;
206     job->ses->que = q->nxt;
207     sMsg( LOG_VERBOSE, "dequeing con %s for ses '%s'",
208     q->nam, job->ses->name );
209     lsv_penq( srv, q );
210     }
211     }
212     self->cur = 0;
213     while ( !(job = lsv_deq( &work->que ))
214     && !(LSV_SHUTDN & srv->flg)
215     ) {
216     #ifndef LSVTHR
217     return self;
218     #else
219     self->nxt = work->wait;
220     work->wait = self; /* pls pls lemme wrk */
221     srv->plen++;
222     self->waits++;
223     sMsg( LOG_DEBUG, "thr %d waiting", self->id );
224     /* wait releases the mutex */
225     pthread_cond_wait( &self->todo, &srv->mut );
226     #endif
227     }
228     self->cur = job;
229     if ( job )
230     job->stg = LSV_STGRUN;
231     #ifdef LSVTHR
232     pthread_mutex_unlock( &srv->mut );
233     #endif
234     /* ****************************************
235     critical section leave
236     */
237    
238     /* end of monday morning meeting blah blah -- start working */
239     if ( ! job || (LSV_SHUTDN & srv->flg) )
240     break;
241     self->jobs++;
242     if ( job->ses )
243     sSet( job->ses );
244     else
245     job->ses = self->ses;
246     job->ses->io[1] = &job->ios;
247     if ( job->ses->res )
248     CLRREC( job->ses->res );
249     sMsg( LOG_DEBUG, "thr %d run job %s ses '%s'.%d",
250     self->id, job->nam, job->ses->name, job->ses->accnt );
251     ret = srv->app( job, LSV_APPRUN );
252     sMsg( LOG_DEBUG, "thr %d got %d buf %d",
253     self->id, ret, LIO_SAVAIL( &job->ios ) );
254    
255     /*
256     lock only in order to create SMP memory barrier.
257     on a single processor system, the byte should be safely readable
258     by receiver without this lock.
259     TOD: check alternatives:
260     since receiver is checking the stage of active connections
261     only in rare cases, it might be more efficient to guard either
262     this set or the processing up to here with a per thread mutex.
263     */
264     #ifdef LSVTHR
265     pthread_mutex_lock( &srv->mut );
266     #endif
267     job->stg = LSV_STGDON;
268     job->ses->cur = 0;
269     #ifdef LSVTHR
270     pthread_mutex_unlock( &srv->mut );
271     #endif
272    
273     ret = srv->prt( &job->ios, LIO_SCLOSE );
274     job->ses->io[1] = 0;
275     sMsg( LOG_DEBUG, "thr %d done job %s", self->id, job->nam );
276     if ( job->ses != self->ses )
277     sSet( self->ses ); /* reset session */
278     }
279     #ifndef LSVTHR
280     if ( self->id )
281     srv->app( 0, LSV_APPFIN );
282     #endif
283     return self;
284     } /* lsv_wrk */
285    
286    
287     static void *lsv_rcv ( void *arg )
288     {
289     Srv *srv = (Srv*)arg;
290     fd_set fd;
291     int fdlen = 0;
292     int pending = 0;
293     int ret = 0, sel, i;
294     #ifdef LSVTHR
295     int inlock = 0;
296     #endif
297    
298     FD_ZERO( &fd );
299     FD_SET( srv->lsn, &fd );
300     fdlen = srv->lsn+1;
301     #ifdef LSVTHR
302     FD_SET( srv->pip[0], &fd );
303     if ( fdlen < srv->pip[0]+1 )
304     fdlen = srv->pip[0]+1;
305     #endif
306    
307     for (;/*ever*/;) {
308     struct sockaddr_in npeer; /* of new connection */
309     int nsock = -1; /* new socket */
310     Con *c;
311     fd_set ready = fd;
312     Tm expire;
313    
314     /* select on read only; writing is done in workers and
315     exceptions (i.e. telnet OOB data) are not used
316     */
317     #ifndef LSVTHR
318     if ( srv->recv.len ) { /* instead of listening on pip */
319     memset( &ready, 0, sizeof(ready) );
320     sel = 0;
321     } else
322     #endif
323     if ( 0 > (sel = select( fdlen, &ready, 0, 0, 0 )) ) {
324     if ( EINTR == errno ) {
325     sMsg( LOG_DEBUG, "select woken by signal" );
326     } else {
327     ret = sMsg( LOG_SYSERR, "select" );
328     goto done;
329     }
330     }
331     if ( lsv_sig.si_signo ) {
332     sMsg( LOG_DEBUG, "SIG%s (%d) from pid %d",
333     SIGHUP==lsv_sig.si_signo ? "HUP" :
334     SIGINT==lsv_sig.si_signo ? "INT" :
335     SIGTERM==lsv_sig.si_signo ? "TERM" :
336     SIGPIPE==lsv_sig.si_signo ? "PIPE" : "?",
337     lsv_sig.si_signo, lsv_sig.si_pid );
338     if ( 1 /*SIGHUP==lsv_sig.si_signo*/ ) {
339     for ( i=FD_SETSIZE; i--; )
340     if ( con[i] ) {
341     c = con[i];
342     sMsg( LOG_INFO,
343     "con %3d peer %s stg %s prt %d app %d ios %d %s (0x%x)",
344     i, c->nam, stg[c->stg], c->prt, c->app,
345     c->ios.pos + c->ios.b.done,
346     LIO_SISOPEN(&c->ios) ? "opn" : "clo", c->ios.file );
347     }
348     }
349     if ( lsv_term ) /* even if it wasn't the last signal delivered */
350     goto done;
351     lsv_sig.si_signo = 0;
352     /* must NOT access the set after interrupt,
353     probably all fds are set
354     */
355     continue;
356     }
357     timeUpd( &srv->tim );
358     timeGtfm( srv->gtm, &srv->tim );
359     expire.millis = srv->tim.millis - srv->sto*LLL(1000);
360    
361     #ifdef LSVTHR
362     pthread_mutex_lock( &srv->mut ); /* back in locked land */
363     inlock = !0;
364     #endif
365    
366     /* -- we're the acceptor */
367     if ( FD_ISSET( srv->lsn, &ready ) ) { /* new connection */
368     unsigned /*socklen_t is broken*/ alen = sizeof(npeer);
369     nsock = accept( srv->lsn, (struct sockaddr*)&npeer, &alen );
370     if ( 0 > nsock ) {
371     ret = sMsg( LOG_SYSERR, "accept" );
372     goto done;
373     }
374     if ( FD_SETSIZE <= nsock ) {
375     ret = sMsg( ERR_BADF,
376     "socket %d >= FD_SETSIZE %d", nsock, FD_SETSIZE );
377     goto done;
378     }
379     if ( sizeof(npeer) != alen ) {
380     ret = sMsg( ERR_INVAL, "bad peer len %d", alen );
381     goto done;
382     }
383     /* ok */
384     /* setsockopt
385     SO_SNDTIMEO fixed value on linux
386     SO_OOBINLINE ... hmm, don't care
387     SO_LINGER off
388     */
389     /* prepare con */
390     if ( !(c = con[nsock]) ) {
391     c = con[nsock] = mAlloc( sizeof(*c) );
392     c->ios.file = nsock; /* preliminary -- further setup below */
393     }
394     c->pdg = LSV_PDGNEW;
395     switch ( c->stg ) {
396     case LSV_STGCON: /* still in control of receiver */
397     case LSV_STGINP:
398     sMsg( ERR_IDIOT, "bad recv stage %d on new con", c->stg );
399     c->stg = LSV_STGNEW;
400     case LSV_STGNEW: /* clean idle socket -- pass to receiver */
401     lsv_enq( &srv->recv, c );
402     break;
403     case LSV_STGSES: /* still in control of receiver */
404     case LSV_STGWRK:
405     sMsg( ERR_IDIOT, "bad worker stage %d on new con", c->stg );
406     case LSV_STGRUN: /* worker will queue it */
407     /* hmmm ... should have COM ... */
408     case LSV_STGCOM:
409     case LSV_STGDON:
410     case LSV_STGRET: /* worker has queued it */
411     pending++;
412     break;
413     }
414     srv->conn++;
415     FD_CLR( srv->lsn, &ready ); /* skip in fd check */
416     } /* new connection */
417    
418     #ifdef LSVTHR
419     if ( FD_ISSET( srv->pip[0], &ready ) ) { /* pending events */
420     char trash[LSV_NUMWRK];
421     read( srv->pip[0], trash, sizeof(trash) );
422     FD_CLR( srv->pip[0], &ready );
423     sel--;
424     }
425     #endif
426    
427     /* switching roles -- now we're the receiver */
428     while ( (c = lsv_deq( &srv->recv )) ) { /* care for ready connections */
429     int sock = LIO_FD & c->ios.file;
430     sMsg( LOG_DEBUG, "deq %d from %s pdg %s",
431     sock, c->nam, pdg[c->pdg] );
432     if ( c->pdg && c->stg )
433     pending--;
434     if ( !(LIO_IN & c->ios.file) && (LIO_RDWR & c->ios.file) ) {
435     /* cleanup closed socks */
436     if ( c != con[sock] ) {
437     ret = sMsg( ERR_IDIOT,
438     "deq bad sock %d (0x%x)", sock, c->ios.file );
439     goto done; /* PANIC time */
440     }
441     sMsg( LOG_DEBUG, "clos%s sock %d",
442     (LIO_OUT & c->ios.file) ? "ing" : "ed", sock );
443     if ( (LIO_OUT & c->ios.file) )
444     lio_close( &c->ios.file, LIO_INOUT );
445     c->ios.file &= ~LIO_RDWR; /* clear also, so we're not closing again */
446     FD_CLR( sock, &fd );
447     FD_CLR( sock, &ready ); /* probably an EOF to read */
448     if ( fdlen == sock+1 )
449     while ( fdlen
450     /* && !FD_ISSET( fdlen-1, &fd ) risk of muting tmp. disabled */
451     && (!con[fdlen-1] || !LIO_SISOPEN(&con[fdlen-1]->ios))
452     )
453     fdlen--;
454     }
455     if ( LSV_PDGNEW == c->pdg ) { /* new connection */
456     struct sockaddr_in peer; /* of new connection */
457     if ( nsock == sock ) /* the one accepted just above */
458     peer = npeer;
459     else {
460     unsigned alen = sizeof(peer);
461     if ( getpeername( sock, (struct sockaddr*)&peer, &alen )
462     || sizeof(peer) != alen
463     ) {
464     ret = sMsg( LOG_SYSERR, "getpeername alen %d", alen );
465     goto done;
466     }
467     }
468     c->srv = srv;
469     c->ses = 0;
470     {
471     unsigned host = (unsigned)ntohl( peer.sin_addr.s_addr );
472     unsigned short port = (unsigned short)ntohs( peer.sin_port );
473     char *p = c->nam;
474     c->host = host;
475     memset( c->nam, 0, sizeof(c->nam) );
476     p += u2a( p, 0xff & (host >> 24) ); *p++ = '.';
477     p += u2a( p, 0xff & (host >> 16) ); *p++ = '.';
478     p += u2a( p, 0xff & (host >> 8) ); *p++ = '.';
479     p += u2a( p, 0xff & (host ) ); *p++ = ':';
480     p += u2a( p, port );
481     /* *p++ = '@'; p += u2a( p, sock ); */
482     }
483     c->con++; /* never mind overflow ... */
484     sMsg( LOG_INFO,
485     "connect %d on %d from %s", c->con+1, sock, c->nam );
486     LIO_SINIT( &c->ios, srv->prt, c->nam,
487     sock | LIO_INOUT|LIO_RDWR|LIO_SOCK );
488     LIO_BINIT( &c->flt );
489     FD_SET( sock, &fd );
490     if ( fdlen < sock+1 )
491     fdlen = sock+1;
492     c->bin = 0; /* do not reset on every request */
493     }
494     if ( ! (LIO_IN & c->ios.file) )
495     c->stg = LSV_STGNEW;
496     else {
497     c->stg = LSV_STGCON;
498     if ( ! FD_ISSET( sock, &fd ) ) {
499     sMsg( LOG_DEBUG, "reenabling %s", c->nam );
500     FD_SET( sock, &fd );
501     if ( fdlen < sock+1 )
502     fdlen = sock+1;
503     }
504     }
505     c->grp = c->pdg = c->prt = c->app = 0;
506     /* prepare for new request */
507     c->ios.pos = 0;
508     c->ios.b.done = c->ios.b.fill = 0;
509     if ( c->req )
510     CLRREC( c->req );
511     else {
512     /* this is done in the receiver rather than the acceptor
513     since in multi-receiver model, receiver owns the req
514     */
515     c->req = mAlloc( LSV_BUFSIZ );
516     OPENISIS_INITREC( c->req, LSV_BUFSIZ, 64 );
517     }
518     if ( !(LSV_CONSES & srv->flg) )
519     c->ses = 0;
520     } /* prepare connections */
521    
522    
523     for ( i=fdlen; sel && i--; ) { /* read from the ready */
524     Field *sid;
525     int got;
526     if ( ! FD_ISSET( i, &ready ) )
527     continue;
528     sel--;
529     c = con[i];
530     if ( ! c ) {
531     ret = sMsg( ERR_IDIOT, "ran on empty con %d", i );
532     goto done;
533     }
534     if ( LSV_STGINP < c->stg ) {
535     sMsg( LOG_DEBUG, "con %d busy stage %d!", i, c->stg );
536     c->pdg = LSV_PDGRED;
537     pending++;
538     FD_CLR( i, &fd ); /* mute this */
539     continue;
540     }
541     c->ios.b.done = 0;
542     got = lio_read( &c->ios.file, c->ios.b.c, sizeof(c->ios.b.c) );
543     if ( 0 >= got ) {
544     if ( -ERR_EOF != got )
545     goto err;
546     sMsg( LOG_DEBUG, "EOF on 0x%x line %d from %s",
547     c->ios.file, c->req->len, c->nam );
548     FD_CLR( i, &fd );
549     if ( ! c->req->len ) /* initial eof */
550     goto eof; /* close immediatly */
551     /* lio_close( &c->ios.file, LIO_IN ); */
552     c->ios.file &= ~LIO_IN; /* don't shut */
553     c->pdg = LSV_PDGEOF; /* so it will be dequed and closed */
554     pending++;
555     got = 0; /* tell proto to close ... */
556     }
557     if ( ! c->req->len ) { /* stage should be CON */
558     RADDS( c->req, -LSV_PEER, c->nam, 1 );
559     sMsg( LOG_DEBUG, "req on %d from %s",
560     LIO_FD&c->ios.file, c->nam );
561     }
562     c->stg = LSV_STGINP;
563     c->ios.b.fill = got;
564     got = srv->prt( &c->ios, LIO_SPUSH );
565     sMsg( LOG_DEBUG, "got 0x%x from %s", got, c->nam );
566     if ( ! got && (LIO_IN & c->ios.file) ) /* ok: more next time */
567     continue;
568     if ( 0 > got ) /* nok */
569     goto err;
570     got = c->srv->app( c, LSV_APPGOT );
571     if ( 0 > got ) /* nok */
572     goto err;
573     if ( !(LSV_CONSES & srv->flg)
574     && c->req && (sid = rGet( c->req, -LSV_SID, 0 ))
575     ) {
576     c->ses = cSesByName( (char*)sid->val, sid->len, &srv->tim, &expire );
577     if ( ! c->ses ) { /* sad sad sad */
578     sMsg( LOG_ERROR, "out of sessions" );
579     goto err; /* drop connection */
580     }
581     if ( ! c->ses->accnt
582     && (! c->ses->prop || ! c->ses->prop->len) /* overflow paranoia */
583     )
584     got = c->srv->app( c, LSV_APPSES );
585     if ( 0 > got ) /* nok */
586     goto err;
587     }
588     c->tim = srv->tim;
589     RADDS( c->req, -LSV_TIME, srv->gtm, 1 );
590     if ( c->ses ) {
591     sMsg( LOG_DEBUG, "req %s for ses '%s' %s",
592     c->nam, c->ses->name, c->ses->cur ? "busy" : "idle" );
593     c->ses->accnt++;
594     if ( c->ses->cur || c->ses->que ) { /* oh - busy !? */
595     Con **cpp = &c->ses->que;
596     int l = 0;
597     for ( ; *cpp; cpp = &(*cpp)->nxt )
598     if ( 4 < ++l ) {
599     sMsg( LOG_WARN,
600     "too many con queing on ses '%s' - dropping %s",
601     c->ses->name, c->nam );
602     goto err;
603     }
604     sMsg( LOG_VERBOSE, "queing con %s for ses '%s'",
605     c->nam, c->ses->name );
606     c->nxt = 0;
607     *cpp = c;
608     c->stg = LSV_STGSES;
609     continue;
610     }
611     }
612     /* yo - got it */
613     lsv_penq( srv, c );
614     continue;
615     err:
616     sMsg( ERR_BADF, "err 0x%x from %s", got, c->nam );
617     eof:
618     lio_close( &c->ios.file, LIO_INOUT );
619     c->stg = LSV_STGNEW;
620     FD_CLR( i, &fd );
621     } /* read from the ready */
622    
623     srv->turn++;
624     srv->busy += srv->nwr - srv->plen;
625     #ifdef LSVTHR
626     srv->wlen += srv->pool.que.len;
627    
628     sMsg( LOG_INFO,
629     "\n==== %s\nturn %d: %d jobs for %d idle workers %d pending",
630     srv->gtm, srv->turn, srv->pool.que.len, srv->plen, pending );
631     for ( i=srv->nwr; i--; )
632     if ( srv->wrk[i].cur )
633     sMsg( LOG_INFO, "thr %d serving %d from %s",
634     i, LIO_FD&srv->wrk[i].cur->ios.file, srv->wrk[i].cur->nam );
635    
636     inlock = 0;
637     pthread_mutex_unlock( &srv->mut );
638     #else
639     lsv_wrk( srv->wrk );
640     #endif
641     } /* for(ever) */
642     done:
643     #ifdef LSVTHR
644     if ( inlock )
645     pthread_mutex_unlock( &srv->mut );
646     srv->flg |= LSV_SHUTDN;
647     for ( i=srv->nwr; i--; )
648     pthread_cond_signal( &srv->wrk[i].todo );
649     #endif
650     return srv;
651     } /* lsv_rcv */
652    
653    
654     int svRun ( Srv *srv, const char *addr )
655     {
656     static const int yes = !0;
657     struct sigaction siga;
658     struct sockaddr_in sa; /* server address */
659     int ret = 0, i;
660    
661     /* assume we're the main-thread
662     -- try to make sure we get the controlling session
663     */
664     sGet();
665    
666     /* set defaults */
667     if ( ! srv->sto )
668     srv->sto = 30*60; /* 30 minutes session timeout */
669     #ifndef LSVTHR
670     srv->nwr = 1;
671     #else
672     if ( ! srv->nwr )
673     srv->nwr = LSV_NUMWRK/2; /* half the max */
674     else if ( LSV_NUMWRK < srv->nwr )
675     srv->nwr = LSV_NUMWRK; /* max */
676    
677     srv->mut = lsv_mutex_init;
678     #endif
679     /* prepare signals */
680     memset( &siga, 0, sizeof(siga) );
681     siga.sa_sigaction = lsv_sighand;
682     sigemptyset( &siga.sa_mask );
683     sigaddset( &siga.sa_mask, SIGHUP );
684     sigaddset( &siga.sa_mask, SIGINT );
685     sigaddset( &siga.sa_mask, SIGTERM );
686     sigaddset( &siga.sa_mask, SIGPIPE );
687     siga.sa_flags = SA_SIGINFO;
688     sigaction( SIGHUP, &siga, 0 );
689     sigaction( SIGINT, &siga, 0 );
690     sigaction( SIGTERM, &siga, 0 );
691     sigaction( SIGPIPE, &siga, 0 );
692    
693     /* prepare socket */
694     srv->lsn = socket( PF_INET, SOCK_STREAM, IPPROTO_TCP/*6*/ );
695     /* fcntl( srv->lsn, O_NONBLOCK ); not needed */
696     sa.sin_family = AF_INET;
697     sa.sin_port = htons( addr ? a2i( addr, -1 ) : 8080 );
698     sa.sin_addr.s_addr =
699     #ifdef LSV_LOCALHOST_ONLY
700     htonl(INADDR_LOOPBACK);
701     #else
702     INADDR_ANY;
703     #endif
704     if ( setsockopt( srv->lsn, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes) ) ) {
705     ret = sMsg( LOG_SYSERR, "setsockopt" );
706     goto inierr;
707     }
708     if ( bind( srv->lsn, (struct sockaddr*)&sa, sizeof(sa) ) ) {
709     ret = sMsg( LOG_SYSERR, "bind on port %u", htons(sa.sin_port) );
710     goto inierr;
711     }
712     if ( listen( srv->lsn, 64/*backlog*/ ) ) {
713     ret = sMsg( LOG_SYSERR, "listen" );
714     goto inierr;
715     }
716     #ifdef LSVTHR
717     /* prepare pending event pipe */
718     if ( pipe( srv->pip ) ) {
719     ret = sMsg( LOG_SYSERR, "pipe" );
720     goto inierr;
721     }
722    
723     if ( pthread_key_create( &lsv_key_wrk, 0 ) ) {
724     ret = sMsg( LOG_SYSERR, "key_create" );
725     goto inierr;
726     }
727     #endif
728     /* setup workers */
729     memset( srv->wrk, 0, sizeof(srv->wrk) );
730     for ( i=srv->nwr; i--; ) {
731     srv->wrk[i].id = i;
732     srv->wrk[i].srv = srv;
733     srv->wrk[i].nxt = 0;
734     srv->wrk[i].ses = cSession(0);
735     memcpy( srv->wrk[i].ses->name, "wrk", 3 );
736     u2a( srv->wrk[i].ses->name+3, i );
737     #ifdef LSVTHR
738     srv->wrk[i].todo = lsv_cond_init;
739     if ( i )
740     pthread_create( &srv->wrk[i].thr, 0, lsv_wrk, srv->wrk+i );
741     #endif
742     }
743     #ifndef LSVTHR
744     lsv_rcv( srv );
745     #else
746     /* start the server thread */
747     pthread_create( &srv->rcv, 0, lsv_rcv, srv );
748     /* run main worker */
749     srv->wrk[0].thr = pthread_self();
750     lsv_wrk( srv->wrk );
751    
752     /* cleanup */
753     pthread_join( srv->rcv, 0 );
754     for ( i=srv->nwr; i--; )
755     pthread_join( srv->wrk[i].thr, 0 );
756     #endif
757     for ( i=srv->nwr; i--; )
758     sMsg( LOG_INFO, "worker %d had %d jobs %d waits",
759     i, srv->wrk[i].jobs, srv->wrk[i].waits );
760     sMsg( LOG_INFO,
761     "server had %d jobs on %d connections %d turns"
762     " avg queue len %.02f workers %.02f",
763     srv->jobs, srv->conn, srv->turn,
764     srv->wlen/srv->turn, srv->busy/srv->turn );
765     inierr:
766     close( srv->lsn );
767     return ret;
768     } /* svRun */
769    
770    
771     /** helper, probably should go to lio.
772     cat l bytes from c to buf b flushing as needed.
773     if ! c, empty buf b.
774     */
775     int lio_sout ( Ios *s, Buf *b, const char *c, unsigned l )
776     {
777     if ( ! b )
778     b = &s->b;
779     do {
780     if ( LIO_BUFSIZ/2 < b->fill && b->done ) { /* purge */
781     if ( b->done < b->fill )
782     memmove( b->c, b->c + b->done, b->fill - b->done );
783     s->pos += b->done;
784     b->fill -= b->done;
785     b->done = 0;
786     }
787     if ( c && l && b->fill < LIO_BUFSIZ ) {
788     unsigned cp = LIO_BUFSIZ - b->fill;
789     if ( cp > l )
790     cp = l;
791     memcpy( b->c + b->fill, c, cp );
792     b->fill += cp;
793     c += cp;
794     l -= cp;
795     if ( !l )
796     return 0;
797     }
798     if ( b->fill > b->done ) {
799     int wr = lio_write( &s->file, b->c + b->done, b->fill - b->done );
800     if ( 0 > wr )
801     return wr;
802     if ( ! wr )
803     return -ERR_BUSY;
804     b->done += wr;
805     if ( b->done > b->fill ) {
806     b->done = b->fill = 0;
807     return -ERR_IDIOT;
808     }
809     }
810     if ( b->done >= b->fill ) {
811     s->pos += b->done;
812     b->done = b->fill = 0;
813     }
814     } while ( c ? l : (unsigned)(b->fill > b->done) );
815     return 0;
816     } /* lio_sout */
817    
818    
819    
820     #define LF 10 /* LineFeed a.k.a. newline - '\n' isn't really well defined */
821     #define TAB 9 /* horizontal, that is */
822     #define VT 11 /* vertical, used as newline replacement */
823     static const char lf = LF;
824     static const char tab = TAB;
825     static const char vt = VT;
826    
827     /* the plain protocol */
828     int svPlain ( Ios *s, int op )
829     {
830     Field *f;
831     Con *c = (Con*)s;
832     switch ( op ) {
833     case LIO_SPUSH: {
834     int l = s->b.fill - s->b.done;
835     unsigned char *b = s->b.c + s->b.done;
836     unsigned char *end = b+l, *v, *p;
837     if ( ! l ) { /* EOF: done */
838     if ( ! c->prt ) /* ok */
839     return 1;
840     /* last field wasn't closed by LF */
841     return sMsg( ERR_INVAL, "no EOL from %s", c->nam );
842     }
843     if ( c->prt )
844     RSPACE( c->req, l, !0 );
845     /* add text lines */
846     while ( b<end ) {
847     int conti = 0;
848     switch ( c->prt ) {
849     case 0: /* at beginning of line -- start new field */
850     if ( LF == *b ) /* empty line */
851     return 1;
852     if ( TAB != *b || !c->req->len )
853     RADD( c->req, 0,0,end-b, !0 );
854     else { /* binary mode continuation line */
855     conti = 1;
856     if ( ! c->bin ) {
857     sMsg( LOG_INFO,
858     "detected binary mode on con %s", c->nam );
859     c->bin = 1;
860     }
861     RSPACE( c->req, end-b, !0 );
862     }
863     if ( ! c->req )
864     return -ERR_NOMEM;
865     c->prt = 1;
866     case 1: /* add to last field */
867     f = c->req->field + c->req->len-1;
868     v = (unsigned char*)f->val;
869     p = v + f->len;
870     if ( conti ) {
871     *p++ = LF;
872     b++;
873     }
874     if ( c->bin ) {
875     for ( ; b<end && LF != (*p = *b++); p++ )
876     ;
877     } else
878     for ( ; b<end && LF != (*p = *b++); p++ )
879     if ( VT == *p ) /* convert VTABs */
880     *p = LF; /* back to newlines */
881     c->req->used += (p - v) - f->len;
882     f->len = p - v;
883     if ( LF == b[-1] ) {
884     int ret = a2il( f->val, f->len, &f->tag );
885     if ( ret ) {
886     if ( ret < f->len && TAB == v[ret] )
887     ret++;
888     if ( ret < f->len )
889     memmove( v, v+ret, f->len - ret );
890     f->len -= ret;
891     }
892     ret = c->srv->app( c, LSV_APPARG );
893     c->prt = 0;
894     if ( ret )
895     return ret;
896     }
897     sMsg( LOG_INFO, "prs from %s: [%2d] %3d = '%.*s'",
898     c->nam, c->req->len-1, f->tag, f->len, f->val );
899     }
900     }
901     return 0;
902     } /* case LIO_SPUSH */
903     case LIO_SFLUSH:
904     if ( s->b.fill <= s->b.done )
905     return 0;
906     case LIO_SCLOSE:
907     if ( !(LIO_OUT & s->file) )
908     return -ERR_INVAL;
909     /*
910     this switch is not protected, since receiver doesn't care
911     */
912     if ( c->stg < LSV_STGCOM )
913     c->stg = LSV_STGCOM;
914     if ( ! s->pos+s->b.done && c->ses->res ) {
915     char num[32];
916     int n = c->ses->res->len;
917     sMsg( LOG_INFO, "com %d fields to %s", c->ses->res->len, c->nam );
918     for ( f = c->ses->res->field; n--; f++ ) {
919     const char *v = f->val, *nl;
920     int vl = f->len;
921     int nlen = i2a( num, f->tag );
922     num[nlen++] = TAB;
923     lio_sout( s, &c->flt, num, nlen );
924     while ( vl && (nl = memchr(v,'\n',vl)) ) {
925     int ll = 1+(nl-v);
926     if ( c->bin ) {
927     lio_sout( s, &c->flt, v, ll );
928     lio_sout( s, &c->flt, &tab, 1 );
929     } else {
930     lio_sout( s, &c->flt, v, ll-1 );
931     lio_sout( s, &c->flt, &vt, 1 );
932     }
933     v += ll;
934     vl -= ll;
935     }
936     if ( vl )
937     lio_sout( s, &c->flt, v, vl );
938     lio_sout( s, &c->flt, &lf, 1 );
939     }
940     }
941     if ( s->b.fill )
942     s->b.done = s->b.fill = 0;
943     /*
944     {
945     lio_sout( s, &c->flt, 0, 0 );
946     lio_stdio( s, LIO_SFLUSH );
947     }
948     */
949     if ( LIO_SCLOSE == op )
950     lio_sout( s, &c->flt, &lf, 1 );
951     lio_sout( s, &c->flt, 0, 0 ); /* finally flush */
952     return 0;
953     }
954     return lio_stream( s, op ); /* hmm -- SPURGE ? */
955     } /* svPlain */
956    
957    
958     /* the echo application */
959     int svEcho ( Con *c, int task )
960     {
961     #ifdef LSVECHO_DELAY
962     static Tm tm = { LLL(50) };
963     #endif
964     Field *f;
965     int i;
966     if ( task ) { /* preparing: we don't care */
967     if ( LSV_APPGOT == task )
968     c->grp = 1; /* don't require mainthead */
969     return 0;
970     }
971     /* run it */
972     sMsg( LOG_INFO, "echo %d: %d fields to %s",
973     svCur()->id, c->req->len, c->nam );
974     #ifdef LSVECHO_DELAY
975     timeSleep( &tm );
976     #endif
977     for ( i=c->req->len, f = c->req->field; i--; f++ ) {
978     RADDF( c->ses->res, f, !0 );
979     /*
980     plain protocol actually does not support plain output ...
981     sMsg( c->ses, 0, "%d = '%.*s'\n", f->tag, f->len, f->val );
982     */
983     }
984     return 0;
985     } /* svEcho */
986    
987     #if 0
988     int main ( int argc, const char **argv )
989     {
990     Srv echo;
991    
992     memset( &echo, 0, sizeof(echo) );
993     echo.prt = lsv_plain;
994     echo.app = lsv_echo;
995     cOpen(0);
996     cLog( LOG_DEBUG, 0 );
997     return svRun( &echo, 1<argc ? argv[1] : 0 );
998     }
999     #endif

  ViewVC Help
Powered by ViewVC 1.1.26