/*
openisis - an open implementation of the CDS/ISIS database
Version 0.8.x (patchlevel see file Version)
Copyright (C) 2001-2003 by Erik Grziwotz, erik@openisis.org
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
see README for more information
EOH */
/*
$Id: lsv.c,v 1.14 2003/06/11 14:53:26 kripke Exp $
OpenIsis server
*/
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <string.h> /* memcpy et al */
#include <signal.h>
#include <errno.h>
#include "lsv.h"
static Con *con[FD_SETSIZE]; /* usually 1024, but up to 1<<16 */
#ifdef LSVTHR
static pthread_mutex_t lsv_mutex_init = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t lsv_cond_init = PTHREAD_COND_INITIALIZER;
static pthread_key_t lsv_key_wrk;
#else
static Wrk *lsv_wrk_single;
#endif
static int lsv_term;
static siginfo_t lsv_sig;
static const char * pdg[] = {
"---",
"NEW",
"RED",
"EOF"
};
static const char * stg[] = {
"NEW",
"CON",
"INP",
"SES",
"WRK",
"RUN",
"COM",
"DON",
"RET"
};
static void lsv_sighand ( int sig, siginfo_t *info, void *ucontext )
{
if ( SIGINT == sig || SIGTERM == sig )
lsv_term = !0;
if ( info )
lsv_sig = *info;
else { /* ??? */
lsv_sig.si_signo = sig;
lsv_sig.si_pid = 0;
}
(void)ucontext;
} /* lsv_sighand */
static int lsv_enq ( Que *q, Con *c )
{
c->nxt = 0;
q->len++;
if ( q->tail ) {
q->tail->nxt = c;
q->tail = c;
return 1;
}
q->head = q->tail = c;
return 0;
} /* lsv_enq */
static void lsv_penq ( Srv *srv, Con *c )
{
Pool *work =
#ifdef LSVTHR
c->grp ? &srv->pool :
#endif
&srv->main;
srv->jobs++;
c->stg = LSV_STGWRK;
if ( c->ses )
c->ses->cur = c;
lsv_enq( &work->que, c );
sMsg( LOG_DEBUG, "enq req %d from %s qlen %d",
LIO_FD&c->ios.file, c->nam, work->que.len );
#ifdef LSVTHR
if ( work->wait ) {
Wrk *w = work->wait;
work->wait = w->nxt;
w->nxt = 0;
srv->plen--;
pthread_cond_signal( &w->todo );
sMsg( LOG_DEBUG, "woke %d of %d", w->id, srv->plen );
}
#endif
} /* lsv_penq */
static Con *lsv_deq ( Que *q )
{
Con *c = q->head;
if ( c ) {
q->len--;
q->head = c->nxt;
c->nxt = 0;
if ( q->tail == c )
q->tail = 0;
}
return c;
} /* lsv_deq */
Wrk *svCur ()
{
#ifdef LSVTHR
return (Wrk*)pthread_getspecific( lsv_key_wrk );
#else
return lsv_wrk_single;
#endif
}
/**
in threaded mode, run forever, waiting for jobs.
non-threaded, do the jobs that are available and return
*/
static void *lsv_wrk ( void *arg )
{
Wrk *self = (Wrk*)arg;
Srv *srv = self->srv;
Con *job = 0;
Pool *work =
#ifdef LSVTHR
self->id ? &srv->pool :
#endif
&srv->main;
#ifdef LSVTHR
sigset_t blk;
/* block signals, they should go to the receiver */
sigemptyset( &blk );
sigaddset( &blk, SIGHUP );
sigaddset( &blk, SIGINT );
sigaddset( &blk, SIGTERM );
sigaddset( &blk, SIGPIPE );
pthread_sigmask( SIG_BLOCK, &blk, 0 );
pthread_setspecific( lsv_key_wrk, arg );
if ( self->id )
srv->app( 0, LSV_APPINI );
#else
lsv_wrk_single = self;
if ( !self->ses )
#endif
self->ses = sGet(); /* allocate session for thread */
for (;/*ever*/;) {
int ret;
/* ****************************************
critical section enter
*/
#ifdef LSVTHR
pthread_mutex_lock( &srv->mut );
#endif
if ( job ) { /* from last turn */
sMsg( LOG_DEBUG, "thr %d enq job %s pdg %s",
self->id, job->nam, pdg[job->pdg] );
job->stg = LSV_STGRET;
lsv_enq( &srv->recv, job );
#ifdef LSVTHR
if ( job->pdg ) /* wakeup receiver */
write( srv->pip[1], &job->pdg, 1 );
#endif
if ( job->ses->que ) { /* immediatly reschedule queued job */
Con *q = job->ses->que;
job->ses->que = q->nxt;
sMsg( LOG_VERBOSE, "dequeing con %s for ses '%s'",
q->nam, job->ses->name );
lsv_penq( srv, q );
}
}
self->cur = 0;
while ( !(job = lsv_deq( &work->que ))
&& !(LSV_SHUTDN & srv->flg)
) {
#ifndef LSVTHR
return self;
#else
self->nxt = work->wait;
work->wait = self; /* pls pls lemme wrk */
srv->plen++;
self->waits++;
sMsg( LOG_DEBUG, "thr %d waiting", self->id );
/* wait releases the mutex */
pthread_cond_wait( &self->todo, &srv->mut );
#endif
}
self->cur = job;
if ( job )
job->stg = LSV_STGRUN;
#ifdef LSVTHR
pthread_mutex_unlock( &srv->mut );
#endif
/* ****************************************
critical section leave
*/
/* end of monday morning meeting blah blah -- start working */
if ( ! job || (LSV_SHUTDN & srv->flg) )
break;
self->jobs++;
if ( job->ses )
sSet( job->ses );
else
job->ses = self->ses;
job->ses->io[1] = &job->ios;
if ( job->ses->res )
CLRREC( job->ses->res );
sMsg( LOG_DEBUG, "thr %d run job %s ses '%s'.%d",
self->id, job->nam, job->ses->name, job->ses->accnt );
ret = srv->app( job, LSV_APPRUN );
sMsg( LOG_DEBUG, "thr %d got %d buf %d",
self->id, ret, LIO_SAVAIL( &job->ios ) );
/*
lock only in order to create SMP memory barrier.
on a single processor system, the byte should be safely readable
by receiver without this lock.
TOD: check alternatives:
since receiver is checking the stage of active connections
only in rare cases, it might be more efficient to guard either
this set or the processing up to here with a per thread mutex.
*/
#ifdef LSVTHR
pthread_mutex_lock( &srv->mut );
#endif
job->stg = LSV_STGDON;
job->ses->cur = 0;
#ifdef LSVTHR
pthread_mutex_unlock( &srv->mut );
#endif
ret = srv->prt( &job->ios, LIO_SCLOSE );
job->ses->io[1] = 0;
sMsg( LOG_DEBUG, "thr %d done job %s", self->id, job->nam );
if ( job->ses != self->ses )
sSet( self->ses ); /* reset session */
}
#ifndef LSVTHR
if ( self->id )
srv->app( 0, LSV_APPFIN );
#endif
return self;
} /* lsv_wrk */
static void *lsv_rcv ( void *arg )
{
Srv *srv = (Srv*)arg;
fd_set fd;
int fdlen = 0;
int pending = 0;
int ret = 0, sel, i;
#ifdef LSVTHR
int inlock = 0;
#endif
FD_ZERO( &fd );
FD_SET( srv->lsn, &fd );
fdlen = srv->lsn+1;
#ifdef LSVTHR
FD_SET( srv->pip[0], &fd );
if ( fdlen < srv->pip[0]+1 )
fdlen = srv->pip[0]+1;
#endif
for (;/*ever*/;) {
struct sockaddr_in npeer; /* of new connection */
int nsock = -1; /* new socket */
Con *c;
fd_set ready = fd;
Tm expire;
/* select on read only; writing is done in workers and
exceptions (i.e. telnet OOB data) are not used
*/
#ifndef LSVTHR
if ( srv->recv.len ) { /* instead of listening on pip */
memset( &ready, 0, sizeof(ready) );
sel = 0;
} else
#endif
if ( 0 > (sel = select( fdlen, &ready, 0, 0, 0 )) ) {
if ( EINTR == errno ) {
sMsg( LOG_DEBUG, "select woken by signal" );
} else {
ret = sMsg( LOG_SYSERR, "select" );
goto done;
}
}
if ( lsv_sig.si_signo ) {
sMsg( LOG_DEBUG, "SIG%s (%d) from pid %d",
SIGHUP==lsv_sig.si_signo ? "HUP" :
SIGINT==lsv_sig.si_signo ? "INT" :
SIGTERM==lsv_sig.si_signo ? "TERM" :
SIGPIPE==lsv_sig.si_signo ? "PIPE" : "?",
lsv_sig.si_signo, lsv_sig.si_pid );
if ( 1 /*SIGHUP==lsv_sig.si_signo*/ ) {
for ( i=FD_SETSIZE; i--; )
if ( con[i] ) {
c = con[i];
sMsg( LOG_INFO,
"con %3d peer %s stg %s prt %d app %d ios %d %s (0x%x)",
i, c->nam, stg[c->stg], c->prt, c->app,
c->ios.pos + c->ios.b.done,
LIO_SISOPEN(&c->ios) ? "opn" : "clo", c->ios.file );
}
}
if ( lsv_term ) /* even if it wasn't the last signal delivered */
goto done;
lsv_sig.si_signo = 0;
/* must NOT access the set after interrupt,
probably all fds are set
*/
continue;
}
timeUpd( &srv->tim );
timeGtfm( srv->gtm, &srv->tim );
expire.millis = srv->tim.millis - srv->sto*LLL(1000);
#ifdef LSVTHR
pthread_mutex_lock( &srv->mut ); /* back in locked land */
inlock = !0;
#endif
/* -- we're the acceptor */
if ( FD_ISSET( srv->lsn, &ready ) ) { /* new connection */
unsigned /*socklen_t is broken*/ alen = sizeof(npeer);
nsock = accept( srv->lsn, (struct sockaddr*)&npeer, &alen );
if ( 0 > nsock ) {
ret = sMsg( LOG_SYSERR, "accept" );
goto done;
}
if ( FD_SETSIZE <= nsock ) {
ret = sMsg( ERR_BADF,
"socket %d >= FD_SETSIZE %d", nsock, FD_SETSIZE );
goto done;
}
if ( sizeof(npeer) != alen ) {
ret = sMsg( ERR_INVAL, "bad peer len %d", alen );
goto done;
}
/* ok */
/* setsockopt
SO_SNDTIMEO fixed value on linux
SO_OOBINLINE ... hmm, don't care
SO_LINGER off
*/
/* prepare con */
if ( !(c = con[nsock]) ) {
c = con[nsock] = mAlloc( sizeof(*c) );
c->ios.file = nsock; /* preliminary -- further setup below */
}
c->pdg = LSV_PDGNEW;
switch ( c->stg ) {
case LSV_STGCON: /* still in control of receiver */
case LSV_STGINP:
sMsg( ERR_IDIOT, "bad recv stage %d on new con", c->stg );
c->stg = LSV_STGNEW;
case LSV_STGNEW: /* clean idle socket -- pass to receiver */
lsv_enq( &srv->recv, c );
break;
case LSV_STGSES: /* still in control of receiver */
case LSV_STGWRK:
sMsg( ERR_IDIOT, "bad worker stage %d on new con", c->stg );
case LSV_STGRUN: /* worker will queue it */
/* hmmm ... should have COM ... */
case LSV_STGCOM:
case LSV_STGDON:
case LSV_STGRET: /* worker has queued it */
pending++;
break;
}
srv->conn++;
FD_CLR( srv->lsn, &ready ); /* skip in fd check */
} /* new connection */
#ifdef LSVTHR
if ( FD_ISSET( srv->pip[0], &ready ) ) { /* pending events */
char trash[LSV_NUMWRK];
read( srv->pip[0], trash, sizeof(trash) );
FD_CLR( srv->pip[0], &ready );
sel--;
}
#endif
/* switching roles -- now we're the receiver */
while ( (c = lsv_deq( &srv->recv )) ) { /* care for ready connections */
int sock = LIO_FD & c->ios.file;
sMsg( LOG_DEBUG, "deq %d from %s pdg %s",
sock, c->nam, pdg[c->pdg] );
if ( c->pdg && c->stg )
pending--;
if ( !(LIO_IN & c->ios.file) && (LIO_RDWR & c->ios.file) ) {
/* cleanup closed socks */
if ( c != con[sock] ) {
ret = sMsg( ERR_IDIOT,
"deq bad sock %d (0x%x)", sock, c->ios.file );
goto done; /* PANIC time */
}
sMsg( LOG_DEBUG, "clos%s sock %d",
(LIO_OUT & c->ios.file) ? "ing" : "ed", sock );
if ( (LIO_OUT & c->ios.file) )
lio_close( &c->ios.file, LIO_INOUT );
c->ios.file &= ~LIO_RDWR; /* clear also, so we're not closing again */
FD_CLR( sock, &fd );
FD_CLR( sock, &ready ); /* probably an EOF to read */
if ( fdlen == sock+1 )
while ( fdlen
/* && !FD_ISSET( fdlen-1, &fd ) risk of muting tmp. disabled */
&& (!con[fdlen-1] || !LIO_SISOPEN(&con[fdlen-1]->ios))
)
fdlen--;
}
if ( LSV_PDGNEW == c->pdg ) { /* new connection */
struct sockaddr_in peer; /* of new connection */
if ( nsock == sock ) /* the one accepted just above */
peer = npeer;
else {
unsigned alen = sizeof(peer);
if ( getpeername( sock, (struct sockaddr*)&peer, &alen )
|| sizeof(peer) != alen
) {
ret = sMsg( LOG_SYSERR, "getpeername alen %d", alen );
goto done;
}
}
c->srv = srv;
c->ses = 0;
{
unsigned host = (unsigned)ntohl( peer.sin_addr.s_addr );
unsigned short port = (unsigned short)ntohs( peer.sin_port );
char *p = c->nam;
c->host = host;
memset( c->nam, 0, sizeof(c->nam) );
p += u2a( p, 0xff & (host >> 24) ); *p++ = '.';
p += u2a( p, 0xff & (host >> 16) ); *p++ = '.';
p += u2a( p, 0xff & (host >> 8) ); *p++ = '.';
p += u2a( p, 0xff & (host ) ); *p++ = ':';
p += u2a( p, port );
/* *p++ = '@'; p += u2a( p, sock ); */
}
c->con++; /* never mind overflow ... */
sMsg( LOG_INFO,
"connect %d on %d from %s", c->con+1, sock, c->nam );
LIO_SINIT( &c->ios, srv->prt, c->nam,
sock | LIO_INOUT|LIO_RDWR|LIO_SOCK );
LIO_BINIT( &c->flt );
FD_SET( sock, &fd );
if ( fdlen < sock+1 )
fdlen = sock+1;
c->bin = 0; /* do not reset on every request */
}
if ( ! (LIO_IN & c->ios.file) )
c->stg = LSV_STGNEW;
else {
c->stg = LSV_STGCON;
if ( ! FD_ISSET( sock, &fd ) ) {
sMsg( LOG_DEBUG, "reenabling %s", c->nam );
FD_SET( sock, &fd );
if ( fdlen < sock+1 )
fdlen = sock+1;
}
}
c->grp = c->pdg = c->prt = c->app = 0;
/* prepare for new request */
c->ios.pos = 0;
c->ios.b.done = c->ios.b.fill = 0;
if ( c->req )
CLRREC( c->req );
else {
/* this is done in the receiver rather than the acceptor
since in multi-receiver model, receiver owns the req
*/
c->req = mAlloc( LSV_BUFSIZ );
OPENISIS_INITREC( c->req, LSV_BUFSIZ, 64 );
}
if ( !(LSV_CONSES & srv->flg) )
c->ses = 0;
} /* prepare connections */
for ( i=fdlen; sel && i--; ) { /* read from the ready */
Field *sid;
int got;
if ( ! FD_ISSET( i, &ready ) )
continue;
sel--;
c = con[i];
if ( ! c ) {
ret = sMsg( ERR_IDIOT, "ran on empty con %d", i );
goto done;
}
if ( LSV_STGINP < c->stg ) {
sMsg( LOG_DEBUG, "con %d busy stage %d!", i, c->stg );
c->pdg = LSV_PDGRED;
pending++;
FD_CLR( i, &fd ); /* mute this */
continue;
}
c->ios.b.done = 0;
got = lio_read( &c->ios.file, c->ios.b.c, sizeof(c->ios.b.c) );
if ( 0 >= got ) {
if ( -ERR_EOF != got )
goto err;
sMsg( LOG_DEBUG, "EOF on 0x%x line %d from %s",
c->ios.file, c->req->len, c->nam );
FD_CLR( i, &fd );
if ( ! c->req->len ) /* initial eof */
goto eof; /* close immediatly */
/* lio_close( &c->ios.file, LIO_IN ); */
c->ios.file &= ~LIO_IN; /* don't shut */
c->pdg = LSV_PDGEOF; /* so it will be dequed and closed */
pending++;
got = 0; /* tell proto to close ... */
}
if ( ! c->req->len ) { /* stage should be CON */
RADDS( c->req, -LSV_PEER, c->nam, 1 );
sMsg( LOG_DEBUG, "req on %d from %s",
LIO_FD&c->ios.file, c->nam );
}
c->stg = LSV_STGINP;
c->ios.b.fill = got;
got = srv->prt( &c->ios, LIO_SPUSH );
sMsg( LOG_DEBUG, "got 0x%x from %s", got, c->nam );
if ( ! got && (LIO_IN & c->ios.file) ) /* ok: more next time */
continue;
if ( 0 > got ) /* nok */
goto err;
got = c->srv->app( c, LSV_APPGOT );
if ( 0 > got ) /* nok */
goto err;
if ( !(LSV_CONSES & srv->flg)
&& c->req && (sid = rGet( c->req, -LSV_SID, 0 ))
) {
c->ses = cSesByName( (char*)sid->val, sid->len, &srv->tim, &expire );
if ( ! c->ses ) { /* sad sad sad */
sMsg( LOG_ERROR, "out of sessions" );
goto err; /* drop connection */
}
if ( ! c->ses->accnt
&& (! c->ses->prop || ! c->ses->prop->len) /* overflow paranoia */
)
got = c->srv->app( c, LSV_APPSES );
if ( 0 > got ) /* nok */
goto err;
}
c->tim = srv->tim;
RADDS( c->req, -LSV_TIME, srv->gtm, 1 );
if ( c->ses ) {
sMsg( LOG_DEBUG, "req %s for ses '%s' %s",
c->nam, c->ses->name, c->ses->cur ? "busy" : "idle" );
c->ses->accnt++;
if ( c->ses->cur || c->ses->que ) { /* oh - busy !? */
Con **cpp = &c->ses->que;
int l = 0;
for ( ; *cpp; cpp = &(*cpp)->nxt )
if ( 4 < ++l ) {
sMsg( LOG_WARN,
"too many con queing on ses '%s' - dropping %s",
c->ses->name, c->nam );
goto err;
}
sMsg( LOG_VERBOSE, "queing con %s for ses '%s'",
c->nam, c->ses->name );
c->nxt = 0;
*cpp = c;
c->stg = LSV_STGSES;
continue;
}
}
/* yo - got it */
lsv_penq( srv, c );
continue;
err:
sMsg( ERR_BADF, "err 0x%x from %s", got, c->nam );
eof:
lio_close( &c->ios.file, LIO_INOUT );
c->stg = LSV_STGNEW;
FD_CLR( i, &fd );
} /* read from the ready */
srv->turn++;
srv->busy += srv->nwr - srv->plen;
#ifdef LSVTHR
srv->wlen += srv->pool.que.len;
sMsg( LOG_INFO,
"\n==== %s\nturn %d: %d jobs for %d idle workers %d pending",
srv->gtm, srv->turn, srv->pool.que.len, srv->plen, pending );
for ( i=srv->nwr; i--; )
if ( srv->wrk[i].cur )
sMsg( LOG_INFO, "thr %d serving %d from %s",
i, LIO_FD&srv->wrk[i].cur->ios.file, srv->wrk[i].cur->nam );
inlock = 0;
pthread_mutex_unlock( &srv->mut );
#else
lsv_wrk( srv->wrk );
#endif
} /* for(ever) */
done:
#ifdef LSVTHR
if ( inlock )
pthread_mutex_unlock( &srv->mut );
srv->flg |= LSV_SHUTDN;
for ( i=srv->nwr; i--; )
pthread_cond_signal( &srv->wrk[i].todo );
#endif
return srv;
} /* lsv_rcv */
int svRun ( Srv *srv, const char *addr )
{
static const int yes = !0;
struct sigaction siga;
struct sockaddr_in sa; /* server address */
int ret = 0, i;
/* assume we're the main-thread
-- try to make sure we get the controlling session
*/
sGet();
/* set defaults */
if ( ! srv->sto )
srv->sto = 30*60; /* 30 minutes session timeout */
#ifndef LSVTHR
srv->nwr = 1;
#else
if ( ! srv->nwr )
srv->nwr = LSV_NUMWRK/2; /* half the max */
else if ( LSV_NUMWRK < srv->nwr )
srv->nwr = LSV_NUMWRK; /* max */
srv->mut = lsv_mutex_init;
#endif
/* prepare signals */
memset( &siga, 0, sizeof(siga) );
siga.sa_sigaction = lsv_sighand;
sigemptyset( &siga.sa_mask );
sigaddset( &siga.sa_mask, SIGHUP );
sigaddset( &siga.sa_mask, SIGINT );
sigaddset( &siga.sa_mask, SIGTERM );
sigaddset( &siga.sa_mask, SIGPIPE );
siga.sa_flags = SA_SIGINFO;
sigaction( SIGHUP, &siga, 0 );
sigaction( SIGINT, &siga, 0 );
sigaction( SIGTERM, &siga, 0 );
sigaction( SIGPIPE, &siga, 0 );
/* prepare socket */
srv->lsn = socket( PF_INET, SOCK_STREAM, IPPROTO_TCP/*6*/ );
/* fcntl( srv->lsn, O_NONBLOCK ); not needed */
sa.sin_family = AF_INET;
sa.sin_port = htons( addr ? a2i( addr, -1 ) : 8080 );
sa.sin_addr.s_addr =
#ifdef LSV_LOCALHOST_ONLY
htonl(INADDR_LOOPBACK);
#else
INADDR_ANY;
#endif
if ( setsockopt( srv->lsn, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes) ) ) {
ret = sMsg( LOG_SYSERR, "setsockopt" );
goto inierr;
}
if ( bind( srv->lsn, (struct sockaddr*)&sa, sizeof(sa) ) ) {
ret = sMsg( LOG_SYSERR, "bind on port %u", htons(sa.sin_port) );
goto inierr;
}
if ( listen( srv->lsn, 64/*backlog*/ ) ) {
ret = sMsg( LOG_SYSERR, "listen" );
goto inierr;
}
#ifdef LSVTHR
/* prepare pending event pipe */
if ( pipe( srv->pip ) ) {
ret = sMsg( LOG_SYSERR, "pipe" );
goto inierr;
}
if ( pthread_key_create( &lsv_key_wrk, 0 ) ) {
ret = sMsg( LOG_SYSERR, "key_create" );
goto inierr;
}
#endif
/* setup workers */
memset( srv->wrk, 0, sizeof(srv->wrk) );
for ( i=srv->nwr; i--; ) {
srv->wrk[i].id = i;
srv->wrk[i].srv = srv;
srv->wrk[i].nxt = 0;
srv->wrk[i].ses = cSession(0);
memcpy( srv->wrk[i].ses->name, "wrk", 3 );
u2a( srv->wrk[i].ses->name+3, i );
#ifdef LSVTHR
srv->wrk[i].todo = lsv_cond_init;
if ( i )
pthread_create( &srv->wrk[i].thr, 0, lsv_wrk, srv->wrk+i );
#endif
}
#ifndef LSVTHR
lsv_rcv( srv );
#else
/* start the server thread */
pthread_create( &srv->rcv, 0, lsv_rcv, srv );
/* run main worker */
srv->wrk[0].thr = pthread_self();
lsv_wrk( srv->wrk );
/* cleanup */
pthread_join( srv->rcv, 0 );
for ( i=srv->nwr; i--; )
pthread_join( srv->wrk[i].thr, 0 );
#endif
for ( i=srv->nwr; i--; )
sMsg( LOG_INFO, "worker %d had %d jobs %d waits",
i, srv->wrk[i].jobs, srv->wrk[i].waits );
sMsg( LOG_INFO,
"server had %d jobs on %d connections %d turns"
" avg queue len %.02f workers %.02f",
srv->jobs, srv->conn, srv->turn,
srv->wlen/srv->turn, srv->busy/srv->turn );
inierr:
close( srv->lsn );
return ret;
} /* svRun */
/** helper, probably should go to lio.
cat l bytes from c to buf b flushing as needed.
if ! c, empty buf b.
*/
int lio_sout ( Ios *s, Buf *b, const char *c, unsigned l )
{
if ( ! b )
b = &s->b;
do {
if ( LIO_BUFSIZ/2 < b->fill && b->done ) { /* purge */
if ( b->done < b->fill )
memmove( b->c, b->c + b->done, b->fill - b->done );
s->pos += b->done;
b->fill -= b->done;
b->done = 0;
}
if ( c && l && b->fill < LIO_BUFSIZ ) {
unsigned cp = LIO_BUFSIZ - b->fill;
if ( cp > l )
cp = l;
memcpy( b->c + b->fill, c, cp );
b->fill += cp;
c += cp;
l -= cp;
if ( !l )
return 0;
}
if ( b->fill > b->done ) {
int wr = lio_write( &s->file, b->c + b->done, b->fill - b->done );
if ( 0 > wr )
return wr;
if ( ! wr )
return -ERR_BUSY;
b->done += wr;
if ( b->done > b->fill ) {
b->done = b->fill = 0;
return -ERR_IDIOT;
}
}
if ( b->done >= b->fill ) {
s->pos += b->done;
b->done = b->fill = 0;
}
} while ( c ? l : (unsigned)(b->fill > b->done) );
return 0;
} /* lio_sout */
#define LF 10 /* LineFeed a.k.a. newline - '\n' isn't really well defined */
#define TAB 9 /* horizontal, that is */
#define VT 11 /* vertical, used as newline replacement */
static const char lf = LF;
static const char tab = TAB;
static const char vt = VT;
/* the plain protocol */
int svPlain ( Ios *s, int op )
{
Field *f;
Con *c = (Con*)s;
switch ( op ) {
case LIO_SPUSH: {
int l = s->b.fill - s->b.done;
unsigned char *b = s->b.c + s->b.done;
unsigned char *end = b+l, *v, *p;
if ( ! l ) { /* EOF: done */
if ( ! c->prt ) /* ok */
return 1;
/* last field wasn't closed by LF */
return sMsg( ERR_INVAL, "no EOL from %s", c->nam );
}
if ( c->prt )
RSPACE( c->req, l, !0 );
/* add text lines */
while ( b<end ) {
int conti = 0;
switch ( c->prt ) {
case 0: /* at beginning of line -- start new field */
if ( LF == *b ) /* empty line */
return 1;
if ( TAB != *b || !c->req->len )
RADD( c->req, 0,0,end-b, !0 );
else { /* binary mode continuation line */
conti = 1;
if ( ! c->bin ) {
sMsg( LOG_INFO,
"detected binary mode on con %s", c->nam );
c->bin = 1;
}
RSPACE( c->req, end-b, !0 );
}
if ( ! c->req )
return -ERR_NOMEM;
c->prt = 1;
case 1: /* add to last field */
f = c->req->field + c->req->len-1;
v = (unsigned char*)f->val;
p = v + f->len;
if ( conti ) {
*p++ = LF;
b++;
}
if ( c->bin ) {
for ( ; b<end && LF != (*p = *b++); p++ )
;
} else
for ( ; b<end && LF != (*p = *b++); p++ )
if ( VT == *p ) /* convert VTABs */
*p = LF; /* back to newlines */
c->req->used += (p - v) - f->len;
f->len = p - v;
if ( LF == b[-1] ) {
int ret = a2il( f->val, f->len, &f->tag );
if ( ret ) {
if ( ret < f->len && TAB == v[ret] )
ret++;
if ( ret < f->len )
memmove( v, v+ret, f->len - ret );
f->len -= ret;
}
ret = c->srv->app( c, LSV_APPARG );
c->prt = 0;
if ( ret )
return ret;
}
sMsg( LOG_INFO, "prs from %s: [%2d] %3d = '%.*s'",
c->nam, c->req->len-1, f->tag, f->len, f->val );
}
}
return 0;
} /* case LIO_SPUSH */
case LIO_SFLUSH:
if ( s->b.fill <= s->b.done )
return 0;
case LIO_SCLOSE:
if ( !(LIO_OUT & s->file) )
return -ERR_INVAL;
/*
this switch is not protected, since receiver doesn't care
*/
if ( c->stg < LSV_STGCOM )
c->stg = LSV_STGCOM;
if ( ! s->pos+s->b.done && c->ses->res ) {
char num[32];
int n = c->ses->res->len;
sMsg( LOG_INFO, "com %d fields to %s", c->ses->res->len, c->nam );
for ( f = c->ses->res->field; n--; f++ ) {
const char *v = f->val, *nl;
int vl = f->len;
int nlen = i2a( num, f->tag );
num[nlen++] = TAB;
lio_sout( s, &c->flt, num, nlen );
while ( vl && (nl = memchr(v,'\n',vl)) ) {
int ll = 1+(nl-v);
if ( c->bin ) {
lio_sout( s, &c->flt, v, ll );
lio_sout( s, &c->flt, &tab, 1 );
} else {
lio_sout( s, &c->flt, v, ll-1 );
lio_sout( s, &c->flt, &vt, 1 );
}
v += ll;
vl -= ll;
}
if ( vl )
lio_sout( s, &c->flt, v, vl );
lio_sout( s, &c->flt, &lf, 1 );
}
}
if ( s->b.fill )
s->b.done = s->b.fill = 0;
/*
{
lio_sout( s, &c->flt, 0, 0 );
lio_stdio( s, LIO_SFLUSH );
}
*/
if ( LIO_SCLOSE == op )
lio_sout( s, &c->flt, &lf, 1 );
lio_sout( s, &c->flt, 0, 0 ); /* finally flush */
return 0;
}
return lio_stream( s, op ); /* hmm -- SPURGE ? */
} /* svPlain */
/* the echo application */
int svEcho ( Con *c, int task )
{
#ifdef LSVECHO_DELAY
static Tm tm = { LLL(50) };
#endif
Field *f;
int i;
if ( task ) { /* preparing: we don't care */
if ( LSV_APPGOT == task )
c->grp = 1; /* don't require mainthead */
return 0;
}
/* run it */
sMsg( LOG_INFO, "echo %d: %d fields to %s",
svCur()->id, c->req->len, c->nam );
#ifdef LSVECHO_DELAY
timeSleep( &tm );
#endif
for ( i=c->req->len, f = c->req->field; i--; f++ ) {
RADDF( c->ses->res, f, !0 );
/*
plain protocol actually does not support plain output ...
sMsg( c->ses, 0, "%d = '%.*s'\n", f->tag, f->len, f->val );
*/
}
return 0;
} /* svEcho */
#if 0
int main ( int argc, const char **argv )
{
Srv echo;
memset( &echo, 0, sizeof(echo) );
echo.prt = lsv_plain;
echo.app = lsv_echo;
cOpen(0);
cLog( LOG_DEBUG, 0 );
return svRun( &echo, 1<argc ? argv[1] : 0 );
}
#endif