Revision 337 (by dpavlin, 2004/06/10 19:22:40) new trunk for webpac v2
/*
	openisis - an open implementation of the CDS/ISIS database
	Version 0.8.x (patchlevel see file Version)
	Copyright (C) 2001-2003 by Erik Grziwotz, erik@openisis.org

	This library is free software; you can redistribute it and/or
	modify it under the terms of the GNU Lesser General Public
	License as published by the Free Software Foundation; either
	version 2.1 of the License, or (at your option) any later version.

	This library is distributed in the hope that it will be useful,
	but WITHOUT ANY WARRANTY; without even the implied warranty of
	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
	Lesser General Public License for more details.

	You should have received a copy of the GNU Lesser General Public
	License along with this library; if not, write to the Free Software
	Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

	see README for more information
EOH */

/*
	$Id: lsv.c,v 1.14 2003/06/11 14:53:26 kripke Exp $
	OpenIsis server
*/

#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <string.h> /* memcpy et al */
#include <signal.h>
#include <errno.h>


#include "lsv.h"



static Con *con[FD_SETSIZE]; /* usually 1024, but up to 1<<16 */
#ifdef LSVTHR
static pthread_mutex_t lsv_mutex_init = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t  lsv_cond_init  = PTHREAD_COND_INITIALIZER;
static pthread_key_t   lsv_key_wrk;
#else
static Wrk *lsv_wrk_single;
#endif
static int lsv_term;
static siginfo_t lsv_sig;

static const char * pdg[] = {
	"---",
	"NEW",
	"RED",
	"EOF"
};
static const char * stg[] = {
	"NEW",
	"CON",
	"INP",
	"SES",
	"WRK",
	"RUN",
	"COM",
	"DON",
	"RET"
};

static void lsv_sighand ( int sig, siginfo_t *info, void *ucontext )
{
	if ( SIGINT == sig || SIGTERM == sig )
		lsv_term = !0;
	if ( info )
		lsv_sig = *info;
	else { /* ??? */
		lsv_sig.si_signo = sig;
		lsv_sig.si_pid = 0;
	}
	(void)ucontext;
}	/* lsv_sighand */


static int lsv_enq ( Que *q, Con *c )
{
	c->nxt = 0;
	q->len++;
	if ( q->tail ) {
		q->tail->nxt = c;
		q->tail = c;
		return 1;
	}
	q->head = q->tail = c;
	return 0;
}	/* lsv_enq */


static void lsv_penq ( Srv *srv, Con *c )
{
	Pool *work =
#ifdef LSVTHR
		c->grp ? &srv->pool :
#endif
		&srv->main;
	srv->jobs++;
	c->stg = LSV_STGWRK;
	if ( c->ses )
		c->ses->cur = c;
	lsv_enq( &work->que, c );
	sMsg( LOG_DEBUG, "enq req %d from %s qlen %d",
		LIO_FD&c->ios.file, c->nam, work->que.len );
#ifdef LSVTHR
	if ( work->wait ) {
		Wrk *w = work->wait;
		work->wait = w->nxt;
		w->nxt = 0;
		srv->plen--;
		pthread_cond_signal( &w->todo );
		sMsg( LOG_DEBUG, "woke %d of %d", w->id, srv->plen );
	}
#endif
}	/* lsv_penq */


static Con *lsv_deq ( Que *q )
{
	Con *c = q->head;
	if ( c ) {
		q->len--;
		q->head = c->nxt;
		c->nxt = 0;
		if ( q->tail == c )
			q->tail = 0;
	}
	return c;
}	/* lsv_deq */


Wrk *svCur ()
{
#ifdef LSVTHR
	return (Wrk*)pthread_getspecific( lsv_key_wrk );
#else
	return lsv_wrk_single;
#endif
}


/**
	in threaded mode, run forever, waiting for jobs.
	non-threaded, do the jobs that are available and return
*/
static void *lsv_wrk ( void *arg )
{
	Wrk *self = (Wrk*)arg;
	Srv *srv = self->srv;
	Con *job = 0;
	Pool *work =
#ifdef LSVTHR
		self->id ? &srv->pool :
#endif
		&srv->main;
#ifdef LSVTHR
	sigset_t blk;
	
	/* block signals, they should go to the receiver */
	sigemptyset( &blk );
	sigaddset( &blk, SIGHUP );
	sigaddset( &blk, SIGINT );
	sigaddset( &blk, SIGTERM );
	sigaddset( &blk, SIGPIPE );
	pthread_sigmask( SIG_BLOCK, &blk, 0 );

	pthread_setspecific( lsv_key_wrk, arg );
	if ( self->id )
		srv->app( 0, LSV_APPINI );
#else
	lsv_wrk_single = self;
	if ( !self->ses )
#endif
	self->ses = sGet(); /* allocate session for thread */

	for (;/*ever*/;) {
		int ret;

		/* ****************************************
			critical section enter
		*/
#ifdef LSVTHR
		pthread_mutex_lock( &srv->mut );
#endif
		if ( job ) { /* from last turn */
			sMsg( LOG_DEBUG, "thr %d enq job %s pdg %s",
				self->id, job->nam, pdg[job->pdg] );
			job->stg = LSV_STGRET;
			lsv_enq( &srv->recv, job );
#ifdef LSVTHR
			if ( job->pdg ) /* wakeup receiver */
				write( srv->pip[1], &job->pdg, 1 );
#endif
			if ( job->ses->que ) { /* immediatly reschedule queued job */
				Con *q = job->ses->que;
				job->ses->que = q->nxt;
				sMsg( LOG_VERBOSE, "dequeing con %s for ses '%s'",
					q->nam, job->ses->name );
				lsv_penq( srv, q );
			}
		}
		self->cur = 0;
		while ( !(job = lsv_deq( &work->que ))
			&& !(LSV_SHUTDN & srv->flg)
		) {
#ifndef LSVTHR
			return self;
#else
			self->nxt = work->wait;
			work->wait = self; /* pls pls lemme wrk */
			srv->plen++;
			self->waits++;
			sMsg( LOG_DEBUG, "thr %d waiting", self->id );
			/* wait releases the mutex */
			pthread_cond_wait( &self->todo, &srv->mut );
#endif
		}
		self->cur = job;
		if ( job )
			job->stg = LSV_STGRUN;
#ifdef LSVTHR
		pthread_mutex_unlock( &srv->mut );
#endif
		/* ****************************************
			critical section leave
		*/

		/* end of monday morning meeting blah blah -- start working */
		if ( ! job || (LSV_SHUTDN & srv->flg) )
			break;
		self->jobs++;
		if ( job->ses )
			sSet( job->ses );
		else
			job->ses = self->ses;
		job->ses->io[1] = &job->ios;
		if ( job->ses->res )
			CLRREC( job->ses->res );
		sMsg( LOG_DEBUG, "thr %d run job %s ses '%s'.%d",
			self->id, job->nam, job->ses->name, job->ses->accnt );
		ret = srv->app( job, LSV_APPRUN );
		sMsg( LOG_DEBUG, "thr %d got %d buf %d",
			self->id, ret, LIO_SAVAIL( &job->ios ) );

		/*
		lock only in order to create SMP memory barrier.
		on a single processor system, the byte should be safely readable
		by receiver without this lock.
		TOD: check alternatives:
		since receiver is checking the stage of active connections
		only in rare cases, it might be more efficient to guard either
		this set or the processing up to here with a per thread mutex.
		*/
#ifdef LSVTHR
		pthread_mutex_lock( &srv->mut );
#endif
		job->stg = LSV_STGDON;
		job->ses->cur = 0;
#ifdef LSVTHR
		pthread_mutex_unlock( &srv->mut );
#endif

		ret = srv->prt( &job->ios, LIO_SCLOSE );
		job->ses->io[1] = 0;
		sMsg( LOG_DEBUG, "thr %d done job %s", self->id, job->nam );
		if ( job->ses != self->ses )
			sSet( self->ses ); /* reset session */
	}
#ifndef LSVTHR
	if ( self->id )
		srv->app( 0, LSV_APPFIN );
#endif
	return self;
}	/* lsv_wrk */


static void *lsv_rcv ( void *arg )
{
	Srv *srv = (Srv*)arg;
	fd_set fd;
	int fdlen = 0;
	int pending = 0;
	int ret = 0, sel, i;
#ifdef LSVTHR
	int inlock = 0;
#endif

	FD_ZERO( &fd );
	FD_SET( srv->lsn, &fd );
	fdlen = srv->lsn+1;
#ifdef LSVTHR
	FD_SET( srv->pip[0], &fd );
	if ( fdlen < srv->pip[0]+1 )
		fdlen = srv->pip[0]+1;
#endif

	for (;/*ever*/;) {
		struct sockaddr_in npeer; /* of new connection */
		int nsock = -1; /* new socket */
		Con *c;
		fd_set ready = fd;
		Tm expire;

		/* select on read only; writing is done in workers and
			exceptions (i.e. telnet OOB data) are not used
		*/
#ifndef LSVTHR
		if ( srv->recv.len ) { /* instead of listening on pip */
			memset( &ready, 0, sizeof(ready) );
			sel = 0;
		} else
#endif
		if ( 0 > (sel = select( fdlen, &ready, 0, 0, 0 )) ) {
			if ( EINTR == errno ) {
				sMsg( LOG_DEBUG, "select woken by signal" );
			} else {
				ret = sMsg( LOG_SYSERR, "select" );
				goto done;
			}
		}
		if ( lsv_sig.si_signo ) {
			sMsg( LOG_DEBUG, "SIG%s (%d) from pid %d",
				SIGHUP==lsv_sig.si_signo ? "HUP" :
				SIGINT==lsv_sig.si_signo ? "INT" :
				SIGTERM==lsv_sig.si_signo ? "TERM" :
				SIGPIPE==lsv_sig.si_signo ? "PIPE" : "?",
				lsv_sig.si_signo, lsv_sig.si_pid );
			if ( 1 /*SIGHUP==lsv_sig.si_signo*/ ) {
				for ( i=FD_SETSIZE; i--; )
					if ( con[i] ) {
						c = con[i];
						sMsg( LOG_INFO,
							"con %3d peer %s stg %s prt %d app %d ios %d %s (0x%x)",
							i, c->nam, stg[c->stg], c->prt, c->app,
							c->ios.pos + c->ios.b.done,
							LIO_SISOPEN(&c->ios) ? "opn" : "clo", c->ios.file );
					}
			}
			if ( lsv_term ) /* even if it wasn't the last signal delivered */
				goto done;
			lsv_sig.si_signo = 0;
			/* must NOT access the set after interrupt,
				probably all fds are set
			*/
			continue;
		}
		timeUpd( &srv->tim );
		timeGtfm( srv->gtm, &srv->tim );
		expire.millis = srv->tim.millis - srv->sto*LLL(1000);

#ifdef LSVTHR
		pthread_mutex_lock( &srv->mut ); /* back in locked land */
		inlock = !0;
#endif

		/* -- we're the acceptor */
		if ( FD_ISSET( srv->lsn, &ready ) ) { /* new connection */
			unsigned /*socklen_t is broken*/ alen = sizeof(npeer);
			nsock = accept( srv->lsn, (struct sockaddr*)&npeer, &alen );
			if ( 0 > nsock ) {
				ret = sMsg( LOG_SYSERR, "accept" );
				goto done;
			}
			if ( FD_SETSIZE <= nsock ) {
				ret = sMsg( ERR_BADF,
					"socket %d >= FD_SETSIZE %d", nsock, FD_SETSIZE );
				goto done;
			}
			if ( sizeof(npeer) != alen ) {
				ret = sMsg( ERR_INVAL, "bad peer len %d", alen );
				goto done;
			}
			/* ok */
			/* setsockopt
				SO_SNDTIMEO fixed value on linux
				SO_OOBINLINE ... hmm, don't care
				SO_LINGER off
			*/
			/* prepare con */
			if ( !(c = con[nsock]) ) {
				c = con[nsock] = mAlloc( sizeof(*c) ); 
				c->ios.file = nsock; /* preliminary -- further setup below */
			}
			c->pdg = LSV_PDGNEW;
			switch ( c->stg ) {
			case LSV_STGCON: /* still in control of receiver */
			case LSV_STGINP:
				sMsg( ERR_IDIOT, "bad recv stage %d on new con", c->stg );
				c->stg = LSV_STGNEW;
			case LSV_STGNEW: /* clean idle socket -- pass to receiver */
				lsv_enq( &srv->recv, c );
				break;
			case LSV_STGSES: /* still in control of receiver */
			case LSV_STGWRK:
				sMsg( ERR_IDIOT, "bad worker stage %d on new con", c->stg );
			case LSV_STGRUN: /* worker will queue it */
				/* hmmm ... should have COM ... */
			case LSV_STGCOM:
			case LSV_STGDON:
			case LSV_STGRET: /* worker has queued it */
				pending++;
				break;
			}
			srv->conn++;
			FD_CLR( srv->lsn, &ready ); /* skip in fd check */
		} /* new connection */

#ifdef LSVTHR
		if ( FD_ISSET( srv->pip[0], &ready ) ) { /* pending events */
			char trash[LSV_NUMWRK];
			read( srv->pip[0], trash, sizeof(trash) );
			FD_CLR( srv->pip[0], &ready );
			sel--;
		}
#endif

		/* switching roles -- now we're the receiver */
		while ( (c = lsv_deq( &srv->recv )) ) { /* care for ready connections */
			int sock = LIO_FD & c->ios.file;
			sMsg( LOG_DEBUG, "deq %d from %s pdg %s",
				sock, c->nam, pdg[c->pdg] );
			if ( c->pdg && c->stg )
				pending--;
			if ( !(LIO_IN & c->ios.file) && (LIO_RDWR & c->ios.file) ) {
				/* cleanup closed socks */
				if ( c != con[sock] ) {
					ret = sMsg( ERR_IDIOT,
						"deq bad sock %d (0x%x)", sock, c->ios.file );
					goto done; /* PANIC time */
				}
				sMsg( LOG_DEBUG, "clos%s sock %d",
					(LIO_OUT & c->ios.file) ? "ing" : "ed", sock );
				if ( (LIO_OUT & c->ios.file) )
					lio_close( &c->ios.file, LIO_INOUT );
				c->ios.file &= ~LIO_RDWR; /* clear also, so we're not closing again */
				FD_CLR( sock, &fd );
				FD_CLR( sock, &ready ); /* probably an EOF to read */
				if ( fdlen == sock+1 )
					while ( fdlen
						/* && !FD_ISSET( fdlen-1, &fd ) risk of muting tmp. disabled */
						&& (!con[fdlen-1] || !LIO_SISOPEN(&con[fdlen-1]->ios))
					)
						fdlen--;
			}
			if ( LSV_PDGNEW == c->pdg ) { /* new connection */
				struct sockaddr_in peer; /* of new connection */
				if ( nsock == sock ) /* the one accepted just above */
					peer = npeer;
				else {
					unsigned alen = sizeof(peer);
					if ( getpeername( sock, (struct sockaddr*)&peer, &alen )
						|| sizeof(peer) != alen
					) {
						ret = sMsg( LOG_SYSERR, "getpeername alen %d", alen );
						goto done;
					}
				}
				c->srv = srv;
				c->ses = 0;
				{
					unsigned host = (unsigned)ntohl( peer.sin_addr.s_addr );
					unsigned short port = (unsigned short)ntohs( peer.sin_port );
					char *p = c->nam;
					c->host = host;
					memset( c->nam, 0, sizeof(c->nam) );
					p += u2a( p, 0xff & (host >> 24) ); *p++ = '.';
					p += u2a( p, 0xff & (host >> 16) ); *p++ = '.';
					p += u2a( p, 0xff & (host >>  8) ); *p++ = '.';
					p += u2a( p, 0xff & (host      ) ); *p++ = ':';
					p += u2a( p, port );
					/* *p++ = '@'; p += u2a( p, sock ); */
				}
				c->con++; /* never mind overflow ... */
				sMsg( LOG_INFO,
					"connect %d on %d from %s", c->con+1, sock, c->nam );
				LIO_SINIT( &c->ios, srv->prt, c->nam,
					sock | LIO_INOUT|LIO_RDWR|LIO_SOCK );
				LIO_BINIT( &c->flt );
				FD_SET( sock, &fd );
				if ( fdlen < sock+1 )
					fdlen = sock+1;
				c->bin = 0; /* do not reset on every request */
			}
			if ( ! (LIO_IN & c->ios.file) )
				c->stg = LSV_STGNEW;
			else {
				c->stg = LSV_STGCON;
				if ( ! FD_ISSET( sock, &fd ) ) {
					sMsg( LOG_DEBUG, "reenabling %s", c->nam );
					FD_SET( sock, &fd );
					if ( fdlen < sock+1 )
						fdlen = sock+1;
				}
			} 
			c->grp = c->pdg = c->prt = c->app = 0;
			/* prepare for new request */
			c->ios.pos = 0;
			c->ios.b.done = c->ios.b.fill = 0;
			if ( c->req )
				CLRREC( c->req );
			else {
				/* this is done in the receiver rather than the acceptor
					since in multi-receiver model, receiver owns the req
				*/
				c->req = mAlloc( LSV_BUFSIZ );
				OPENISIS_INITREC( c->req, LSV_BUFSIZ, 64 );
			}
			if ( !(LSV_CONSES & srv->flg) )
				c->ses = 0;
		}	/* prepare connections */


		for ( i=fdlen; sel && i--; ) { /* read from the ready */
			Field *sid;
			int got;
			if ( ! FD_ISSET( i, &ready ) )
				continue;
			sel--;
			c = con[i];
			if ( ! c ) {
				ret = sMsg( ERR_IDIOT, "ran on empty con %d", i );
				goto done;
			}
			if ( LSV_STGINP < c->stg ) {
				sMsg( LOG_DEBUG, "con %d busy stage %d!", i, c->stg );
				c->pdg = LSV_PDGRED;
				pending++;
				FD_CLR( i, &fd ); /* mute this */
				continue;
			}
			c->ios.b.done = 0; 
			got = lio_read( &c->ios.file, c->ios.b.c, sizeof(c->ios.b.c) );
			if ( 0 >= got ) {
				if ( -ERR_EOF != got )
					goto err;
				sMsg( LOG_DEBUG, "EOF on 0x%x line %d from %s",
					c->ios.file, c->req->len, c->nam );
				FD_CLR( i, &fd );
				if ( ! c->req->len ) /* initial eof */
					goto eof; /* close immediatly */
				/* lio_close( &c->ios.file, LIO_IN ); */
				c->ios.file &= ~LIO_IN; /* don't shut */
				c->pdg = LSV_PDGEOF; /* so it will be dequed and closed */
				pending++;
				got = 0; /* tell proto to close ... */
			}
			if ( ! c->req->len ) { /* stage should be CON */
				RADDS( c->req, -LSV_PEER, c->nam, 1 );
				sMsg( LOG_DEBUG, "req on %d from %s",
					LIO_FD&c->ios.file, c->nam );
			}
			c->stg = LSV_STGINP;
			c->ios.b.fill = got;
			got = srv->prt( &c->ios, LIO_SPUSH );
			sMsg( LOG_DEBUG, "got 0x%x from %s", got, c->nam );
			if ( ! got && (LIO_IN & c->ios.file) ) /* ok: more next time */
				continue;
			if ( 0 > got ) /* nok */
				goto err;
			got = c->srv->app( c, LSV_APPGOT );
			if ( 0 > got ) /* nok */
				goto err;
			if ( !(LSV_CONSES & srv->flg)
				&& c->req && (sid = rGet( c->req, -LSV_SID, 0 ))
			) {
				c->ses = cSesByName( (char*)sid->val, sid->len, &srv->tim, &expire );
				if ( ! c->ses ) { /* sad sad sad */
					sMsg( LOG_ERROR, "out of sessions" );
					goto err; /* drop connection */
				}
				if ( ! c->ses->accnt
					&& (! c->ses->prop || ! c->ses->prop->len) /* overflow paranoia */
				)
					got = c->srv->app( c, LSV_APPSES );
				if ( 0 > got ) /* nok */
					goto err;
			}
			c->tim = srv->tim;
			RADDS( c->req, -LSV_TIME, srv->gtm, 1 );
			if ( c->ses ) {
				sMsg( LOG_DEBUG, "req %s for ses '%s' %s",
					c->nam, c->ses->name, c->ses->cur ? "busy" : "idle" );
				c->ses->accnt++;
				if ( c->ses->cur || c->ses->que ) { /* oh - busy !? */
					Con **cpp = &c->ses->que;
					int l = 0;
					for ( ; *cpp; cpp = &(*cpp)->nxt )
						if ( 4 < ++l ) {
							sMsg( LOG_WARN,
								"too many con queing on ses '%s' - dropping %s",
								c->ses->name, c->nam );
							goto err;
						}
					sMsg( LOG_VERBOSE, "queing con %s for ses '%s'",
						c->nam, c->ses->name );
					c->nxt = 0;
					*cpp = c;
					c->stg = LSV_STGSES;
					continue;
				}
			}
			/* yo - got it */
			lsv_penq( srv, c );
			continue;
		err:
			sMsg( ERR_BADF, "err 0x%x from %s", got, c->nam );
		eof:
			lio_close( &c->ios.file, LIO_INOUT );
			c->stg = LSV_STGNEW;
			FD_CLR( i, &fd );
		} /* read from the ready */

		srv->turn++;
		srv->busy += srv->nwr - srv->plen;
#ifdef LSVTHR
		srv->wlen += srv->pool.que.len;

		sMsg( LOG_INFO,
			"\n==== %s\nturn %d: %d jobs for %d idle workers %d pending",
			srv->gtm, srv->turn, srv->pool.que.len, srv->plen, pending );
		for ( i=srv->nwr; i--; )
			if ( srv->wrk[i].cur )
				sMsg( LOG_INFO, "thr %d serving %d from %s",
					i, LIO_FD&srv->wrk[i].cur->ios.file, srv->wrk[i].cur->nam );

		inlock = 0;
		pthread_mutex_unlock( &srv->mut );
#else
		lsv_wrk( srv->wrk );
#endif
	} /* for(ever) */
done:
#ifdef LSVTHR
	if ( inlock )
		pthread_mutex_unlock( &srv->mut );
	srv->flg |= LSV_SHUTDN;
	for ( i=srv->nwr; i--; )
		pthread_cond_signal( &srv->wrk[i].todo );
#endif
	return srv;
}	/* lsv_rcv */


int svRun ( Srv *srv, const char *addr )
{
	static const int yes = !0;
	struct sigaction siga;
	struct sockaddr_in sa; /* server address */
	int ret = 0, i;

	/* assume we're the main-thread
		-- try to make sure we get the controlling session
	*/
	sGet();

	/* set defaults */
	if ( ! srv->sto )
		srv->sto = 30*60; /* 30 minutes session timeout */
#ifndef LSVTHR
	srv->nwr = 1;
#else
	if ( ! srv->nwr )
		srv->nwr = LSV_NUMWRK/2; /* half the max */
	else if ( LSV_NUMWRK < srv->nwr )
		srv->nwr = LSV_NUMWRK; /* max */

	srv->mut = lsv_mutex_init;
#endif
	/* prepare signals */
	memset( &siga, 0, sizeof(siga) );
	siga.sa_sigaction = lsv_sighand;
	sigemptyset( &siga.sa_mask );
	sigaddset( &siga.sa_mask, SIGHUP );
	sigaddset( &siga.sa_mask, SIGINT );
	sigaddset( &siga.sa_mask, SIGTERM );
	sigaddset( &siga.sa_mask, SIGPIPE );
	siga.sa_flags = SA_SIGINFO;
	sigaction( SIGHUP, &siga, 0 );
	sigaction( SIGINT, &siga, 0 );
	sigaction( SIGTERM, &siga, 0 );
	sigaction( SIGPIPE, &siga, 0 );
	
	/* prepare socket */
	srv->lsn = socket( PF_INET, SOCK_STREAM, IPPROTO_TCP/*6*/ );
	/* fcntl( srv->lsn, O_NONBLOCK ); not needed */
	sa.sin_family = AF_INET;
	sa.sin_port = htons( addr ? a2i( addr, -1 ) : 8080 );
	sa.sin_addr.s_addr =
#ifdef LSV_LOCALHOST_ONLY
		htonl(INADDR_LOOPBACK);
#else
		INADDR_ANY;
#endif
	if ( setsockopt( srv->lsn, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes) ) ) {
		ret = sMsg( LOG_SYSERR, "setsockopt" );
		goto inierr;
	}
	if ( bind( srv->lsn, (struct sockaddr*)&sa, sizeof(sa) ) ) {
		ret = sMsg( LOG_SYSERR, "bind on port %u", htons(sa.sin_port) );
		goto inierr;
	}
	if ( listen( srv->lsn, 64/*backlog*/ ) ) {
		ret = sMsg( LOG_SYSERR, "listen" );
		goto inierr;
	}
#ifdef LSVTHR
	/* prepare pending event pipe */
	if ( pipe( srv->pip ) ) {
		ret = sMsg( LOG_SYSERR, "pipe" );
		goto inierr;
	}

	if ( pthread_key_create( &lsv_key_wrk, 0 ) ) {
		ret = sMsg( LOG_SYSERR, "key_create" );
		goto inierr;
	}
#endif
	/* setup workers */
	memset( srv->wrk, 0, sizeof(srv->wrk) );
	for ( i=srv->nwr; i--; ) {
		srv->wrk[i].id  = i;
		srv->wrk[i].srv = srv;
		srv->wrk[i].nxt = 0;
		srv->wrk[i].ses = cSession(0);
		memcpy( srv->wrk[i].ses->name, "wrk", 3 );
		u2a( srv->wrk[i].ses->name+3, i );
#ifdef LSVTHR
		srv->wrk[i].todo = lsv_cond_init;
		if ( i )
			pthread_create( &srv->wrk[i].thr, 0, lsv_wrk, srv->wrk+i );
#endif
	}
#ifndef LSVTHR
	lsv_rcv( srv );
#else
	/* start the server thread */
	pthread_create( &srv->rcv, 0, lsv_rcv, srv );
	/* run main worker */
	srv->wrk[0].thr = pthread_self();
	lsv_wrk( srv->wrk );

	/* cleanup */
	pthread_join( srv->rcv, 0 );
	for ( i=srv->nwr; i--; )
		pthread_join( srv->wrk[i].thr, 0 );
#endif
	for ( i=srv->nwr; i--; )
		sMsg( LOG_INFO, "worker %d had %d jobs %d waits",
			i, srv->wrk[i].jobs, srv->wrk[i].waits );
	sMsg( LOG_INFO,
		"server had %d jobs on %d connections %d turns"
		" avg queue len %.02f workers %.02f",
		srv->jobs, srv->conn, srv->turn,
		srv->wlen/srv->turn, srv->busy/srv->turn );
inierr:
	close( srv->lsn );
	return ret;
}	/* svRun */


/** helper, probably should go to lio.
	cat l bytes from c to buf b flushing as needed.
	if ! c, empty buf b.
*/
int lio_sout ( Ios *s, Buf *b, const char *c, unsigned l )
{
	if ( ! b )
		b = &s->b;
	do {
		if ( LIO_BUFSIZ/2 < b->fill && b->done ) { /* purge */
			if ( b->done < b->fill )
				memmove( b->c, b->c + b->done, b->fill - b->done );
			s->pos += b->done;
			b->fill -= b->done;
			b->done = 0;
		}
		if ( c && l && b->fill < LIO_BUFSIZ ) {
			unsigned cp = LIO_BUFSIZ - b->fill;
			if ( cp > l )
				cp = l;
			memcpy( b->c + b->fill, c, cp );
			b->fill += cp;
			c += cp;
			l -= cp;
			if ( !l )
				return 0;
		}
		if ( b->fill > b->done ) {
			int wr = lio_write( &s->file, b->c + b->done, b->fill - b->done );
			if ( 0 > wr )
				return wr;
			if ( ! wr )
				return -ERR_BUSY;
			b->done += wr;
			if ( b->done > b->fill ) {
				b->done = b->fill = 0;
				return -ERR_IDIOT;
			}
		}
		if ( b->done >= b->fill ) {
			s->pos += b->done;
			b->done = b->fill = 0;
		}
	} while ( c ? l : (unsigned)(b->fill > b->done) );
	return 0;
}	/* lio_sout */



#define LF 10 /* LineFeed a.k.a. newline - '\n' isn't really well defined */
#define TAB 9 /* horizontal, that is */
#define VT 11 /* vertical, used as newline replacement */
static const char lf = LF;
static const char tab = TAB;
static const char vt = VT;

/* the plain protocol */
int svPlain ( Ios *s, int op )
{
	Field *f;
	Con *c = (Con*)s;
	switch ( op ) {
	case LIO_SPUSH: {
		int l = s->b.fill - s->b.done;
		unsigned char *b = s->b.c + s->b.done;
		unsigned char *end = b+l, *v, *p;
		if ( ! l ) { /* EOF: done */
			if ( ! c->prt ) /* ok */
				return 1;
			/* last field wasn't closed by LF */
			return sMsg( ERR_INVAL, "no EOL from %s", c->nam );
		}
		if ( c->prt )
			RSPACE( c->req, l, !0 );
		/* add text lines */
		while ( b<end ) {
			int conti = 0;
			switch ( c->prt ) {
			case 0: /* at beginning of line -- start new field */
				if ( LF == *b ) /* empty line */
					return 1;
				if ( TAB != *b || !c->req->len )
					RADD( c->req, 0,0,end-b, !0 );
				else { /* binary mode continuation line */
					conti = 1;
					if ( ! c->bin ) {
						sMsg( LOG_INFO,
							"detected binary mode on con %s", c->nam );
						c->bin = 1;
					}
					RSPACE( c->req, end-b, !0 );
				}
				if ( ! c->req )
					return -ERR_NOMEM;
				c->prt = 1;
			case 1: /* add to last field */
				f = c->req->field + c->req->len-1;
				v = (unsigned char*)f->val;
				p = v + f->len;
				if ( conti ) {
					*p++ = LF;
					b++;
				}
				if ( c->bin ) {
					for ( ; b<end && LF != (*p = *b++); p++ )
						;
				} else 
					for ( ; b<end && LF != (*p = *b++); p++ )
						if ( VT == *p ) /* convert VTABs */
							*p = LF; /* back to newlines */
				c->req->used += (p - v) - f->len;
				f->len = p - v;
				if ( LF == b[-1] ) {
					int ret = a2il( f->val, f->len, &f->tag );
					if ( ret ) {
						if ( ret < f->len && TAB == v[ret] )
							ret++;
						if ( ret < f->len )
							memmove( v, v+ret, f->len - ret );
						f->len -= ret;
					}
					ret = c->srv->app( c, LSV_APPARG );
					c->prt = 0;
					if ( ret )
						return ret;
				}
				sMsg( LOG_INFO, "prs from %s: [%2d] %3d = '%.*s'",
					c->nam, c->req->len-1, f->tag, f->len, f->val  );
			}
		}
		return 0;
	}	/* case LIO_SPUSH */
	case LIO_SFLUSH:
		if ( s->b.fill <= s->b.done )
			return 0;
	case LIO_SCLOSE:
		if ( !(LIO_OUT & s->file) )
			return -ERR_INVAL;
		/*
			this switch is not protected, since receiver doesn't care
		*/
		if ( c->stg < LSV_STGCOM )
			c->stg = LSV_STGCOM;
		if ( ! s->pos+s->b.done && c->ses->res ) {
			char num[32];
			int n = c->ses->res->len;
			sMsg( LOG_INFO, "com %d fields to %s", c->ses->res->len, c->nam );
			for ( f = c->ses->res->field; n--; f++ ) {
				const char *v = f->val, *nl;
				int vl = f->len;
				int nlen = i2a( num, f->tag );
				num[nlen++] = TAB;
				lio_sout( s, &c->flt, num, nlen );
				while ( vl && (nl = memchr(v,'\n',vl)) ) {
					int ll = 1+(nl-v);
					if ( c->bin ) {
						lio_sout( s, &c->flt, v, ll );
						lio_sout( s, &c->flt, &tab, 1 );
					} else {
						lio_sout( s, &c->flt, v, ll-1 );
						lio_sout( s, &c->flt, &vt, 1 );
					}
					v += ll;
					vl -= ll;
				}
				if ( vl )
					lio_sout( s, &c->flt, v, vl );
				lio_sout( s, &c->flt, &lf, 1 );
			}
		}
		if ( s->b.fill )
			s->b.done = s->b.fill = 0;
		/*
		{
			lio_sout( s, &c->flt, 0, 0 ); 
			lio_stdio( s, LIO_SFLUSH );
		}
		*/
		if ( LIO_SCLOSE == op )
			lio_sout( s, &c->flt, &lf, 1 );
		lio_sout( s, &c->flt, 0, 0 ); /* finally flush */
		return 0;
	}
	return lio_stream( s, op ); /* hmm -- SPURGE ? */
}	/* svPlain */


/* the echo application */
int svEcho ( Con *c, int task )
{
#ifdef LSVECHO_DELAY
	static Tm tm = { LLL(50) };
#endif
	Field *f;
	int i;
	if ( task ) { /* preparing: we don't care */
		if ( LSV_APPGOT == task )
			c->grp = 1; /* don't require mainthead */
		return 0;
	}
	/* run it */
	sMsg( LOG_INFO, "echo %d: %d fields to %s",
		svCur()->id, c->req->len, c->nam );
#ifdef LSVECHO_DELAY
	timeSleep( &tm );
#endif
	for ( i=c->req->len, f = c->req->field; i--; f++ ) {
		RADDF( c->ses->res, f, !0 );
		/*
		plain protocol actually does not support plain output ...
		sMsg( c->ses, 0, "%d = '%.*s'\n", f->tag, f->len, f->val );
		*/
	}
	return 0;
}	/* svEcho */

#if 0
int main ( int argc, const char **argv )
{
	Srv echo;

	memset( &echo, 0, sizeof(echo) );
	echo.prt = lsv_plain;
	echo.app = lsv_echo;
	cOpen(0);
	cLog( LOG_DEBUG, 0 );
	return svRun( &echo, 1<argc ? argv[1] : 0 );
}
#endif