Revision 237 (by dpavlin, 2004/03/08 17:43:12) initial import of openisis 0.9.0 vendor drop
/*
	openisis - an open implementation of the CDS/ISIS database
	Version 0.8.x (patchlevel see file Version)
	Copyright (C) 2001-2003 by Erik Grziwotz, erik@openisis.org

	This library is free software; you can redistribute it and/or
	modify it under the terms of the GNU Lesser General Public
	License as published by the Free Software Foundation; either
	version 2.1 of the License, or (at your option) any later version.

	This library is distributed in the hope that it will be useful,
	but WITHOUT ANY WARRANTY; without even the implied warranty of
	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
	Lesser General Public License for more details.

	You should have received a copy of the GNU Lesser General Public
	License along with this library; if not, write to the Free Software
	Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

	see README for more information
EOH */

/*
	$Id: lbt.c,v 1.35 2003/05/27 11:03:30 kripke Exp $
	the btree.
*/

#include <stdlib.h>
#include <string.h>

/* special
#if defined( __GNUC__ ) && defined ( alloca )
#include <alloca.h>
#endif
*/
#if defined( sparc ) || defined( __ppc__ )
#	define LDB_BIG_ENDIAN
#endif

#include "lio.h"
#include "luti.h"
#include "lbt.h"


/*
probably all relevant cases can be handled by the more
efficient approach of storing recoded key a la strxfrm
*/
#define LBT_USECMP 0


/* ************************************************************
	private types
*/

enum {
	/* little endian magic text */
	LBT_OPEN = 'O' | 'P'<<8 | 'E'<<16 | 'N'<<24,
	LBT_ISIS = 'I' | 'S'<<8 | 'I'<<16 | 'S'<<24
};


enum {
	LBT_LEVELS  = 16, /* max depth */
	LBT_POSBITS = 13, /* bits for position within block */
	LBT_POSMASK = 0x1fff,
	LBT_POSMAX  = LBT_POSMASK /* obviously :) */
};

typedef struct Dict { /* dictionary of block entries */
	unsigned short pos; /* 13 bits position of entry, 2 bits val for leaves */
	unsigned char  vln; /* length of entry's value */
		/* for inner nodes, this is the length of qualifying
			second key segment (either 0 or bt->vsz <= 23).
			for leaves, this, together with higher bits of pos,
			is the number of values of bt->vsz each.
		*/
	unsigned char  kln; /* length of entry's key */
/*
	the following access macros are needed for leaves only
*/
#define LBT_POS( d ) (LBT_POSMASK & (d)->pos)
#define LBT_VLN( d ) ((d)->vln | (~LBT_POSMASK & (d)->pos)>>(LBT_POSBITS-8))
#define LBT_POSADD( d, a ) do { \
		(d)->pos = (~LBT_POSMASK & (d)->pos) | (LBT_POSMASK & ((d)->pos + (a))); \
	} while (0)
#define LBT_VLNADD( d, a ) do { \
		unsigned _vln = LBT_VLN( d ) + (a); \
		(d)->pos = (LBT_POSMASK & (d)->pos) \
			| (~LBT_POSMASK & ((_vln)<<(LBT_POSBITS-8))); \
		(d)->vln = (unsigned char)(_vln); \
	} while (0)
#define LBT_PV( d, p, v ) do { \
		(d)->pos = (LBT_POSMASK & (p)) | (~LBT_POSMASK & ((v)<<(LBT_POSBITS-8))); \
		(d)->vln = (unsigned char)(v); \
	} while (0)
#define LBT_PVK( d, p, v, k ) do { \
		LBT_PV( d, p, v ); \
		(d)->kln = (unsigned char)(k); \
	} while (0)
} Dict;


/* guard space around block */
#ifndef NDEBUG
#	define LBT_GUARD 64
/* template for guard space -- blank or pattern */
static unsigned char guard[LBT_GUARD];
#else
#	define LBT_GUARD 0
#endif


/*
	TODO: check the possible performance gain in findkey
	from separating disk block and administrative data
	and aligning disk blocks on PAGE boundaries in memory
*/
typedef struct Block {
#if LBT_GUARD
	unsigned char  grd[LBT_GUARD];
#endif
	unsigned char  ird; /* is reading */
	unsigned char  wrd; /* #threads waiting for read */
	unsigned char  lck; /* is write locked */
	unsigned char  wlk; /* #threads waiting for write lock */
#define LBT_BUSY( b ) (*(int*)&(b)->ird) /* any of 1st 4 bytes != 0 */
	struct Block  *nhs; /* next in hash chain */
	struct Block  *nru; /* next (more) recently used */
	/* TODO: possibly double-link lru list, so we can unlink on every access ? */
	/* the disk block -- from here on bsz bytes are on disk */
#define LBT_BOFF ((char *)&((Block*)0)->num - (char*)0)
#define LBT_BASE( b ) ((unsigned char *)&(b)->num)
	unsigned       num; /* number of this block, 0 is root */
	unsigned       nxt; /* right sibling, "OPEN" or "ISIS" for root */
	/* typ,key,col are redundant in non-root blocks, maybe abused ... */
	unsigned char  typ; /* type: bsz, vsz, flags */
	unsigned char  key; /* max key length */
	unsigned char  col; /* collation */
	unsigned char  lev; /* level over bottom, 0 for leaves */
	unsigned short ent; /* 10 bits # entries */
#define LBT_ENT( b ) (b)->ent /* currently other bits arent abused */
	unsigned short stk; /* 13 bits offset of keyspace stack */
/* offset of dict RELATIVE to base */
#define LBT_DOFF ((char *)&((Block*)0)->dct - (char*)&((Block*)0)->num)
	Dict           dct[1];
}	Block;

/*
	LAYOUT:
	dct is an array of dictionary entries in increasing key order
	stk is growing downwards from block end towards dict
	position of entry i is b->stk <= LBT_POS(b->dct+i) < bt->bsz
	decreasing on i (i.e. stack space is in reverse ordering)

	at LBT_POS(b->dct+i), there are b->dct[i].kln bytes key
	for a leaf entry, the key is followed by
	LBT_VLN(b->dct+i) (between 0..1023 incl.) values
	of bt->vsz (between 8..23 incl) each
	for an inner entry, the key is followed by
	4 bytes block number and optionally bt->vsz qualifying value

	SIZES:
	block size >= 1024
	block base overhead LBT_DOFF is 16 bytes
	avail size >= 1008
	Dict entry is 4 bytes
	max. used stack space per entry is:
	inner node: kln+4+bt->vsz <= 255+4+23 = 282
	new leaf entry (one value): kln+bt->vsz <= 278
	max total space for a new entry incl. dict is 286 < avail size/3
*/

typedef struct Chunk {
	struct Chunk *nxt; 
}	Chunk;


enum {
	/* number of levels for lru list */
	LBT_LRULEV  = sizeof(((Idx*)0)->lru)/sizeof(((Idx*)0)->lru[0]),
	LBT_ENTBITS = 10, /* bits for # enries */
	LBT_ENTMASK = 0x03ff,
	LBT_ENTMAX  = LBT_ENTMASK,
	LBT_MATCH   = 0x8000, /* entry matched */
	LBT_ENTSIZE = 8 /* minimum cost of entry: 4 byte dict, 4 byte val */
};


typedef struct Batch {
	Block   *cur;
	unsigned got; /* current for this key */
	unsigned tot; /* total for this key */
	unsigned let;
	/* stats */
	unsigned keys;
	unsigned keyl;
	unsigned maxl;
	unsigned vals;
	unsigned maxv;
	unsigned span;
	unsigned free;
	/* buffer */
	Key      key;
	Val      val[LBT_ENTMAX];
}	Batch;


typedef enum {
	LBT_RD, /* read -- no lock */
	LBT_WR, /* write */
	LBT_EX /* exclusive */
} lbt_get;


typedef struct {	/* structure filled by findkey */
	Idx           *bt;	/* the tree */
	Key           *key; /* the key searched */
	Block         *b[LBT_LEVELS]; /* path to leaf, b[0] is leaf */
	unsigned       n[LBT_LEVELS]; /* block numbers of path */
	unsigned short e[LBT_LEVELS]; /* entries in path */
}	FindKey;


typedef struct {
	unsigned get;
	unsigned miss;
	unsigned cmp;
	unsigned key;
}	Stat;

/* buffer to print a value to.
	lowest 11 bytes are printed as 4 shorts + one int < 4*5+10 = 30 digits+5sep
	higher 12 bytes are printed hex = 24 bytes.
*/
typedef char PrVal[64];


/* ************************************************************
	private data
*/

/*
	stat access is not guarded by memory barrier.
	may be wrong when running on multiple processors
*/
static Stat stat;


/* ************************************************************
	private functions
*/

#if defined( sparc )
static unsigned GETINT ( void *m ) /* bus error safe version of *(int*) */
{
	unsigned l;
	memcpy( &l, m, 4 );
	return l;
}
static void SETINT ( void *m, unsigned i )
{
	memcpy( m, &i, 4 );
}
#else
#define GETINT( m ) (*(unsigned*)(m))
#define SETINT( m, i ) (*(unsigned*)(m) = (i))
#endif

static void mkval ( Val *v, Hit *ifp )
{
	unsigned char *c = v->byt;
	switch ( v->len ) {
	default: memset( c, 0, v->len-11 ); c += v->len-11;
	case 11: *c++ = (unsigned char) (ifp->dbn >> 8);
	case 10: *c++ = (unsigned char)  ifp->dbn;
	case 9:  *c++ = (unsigned char) (ifp->mfn >> 24);
	case 8:  *c++ = (unsigned char) (ifp->mfn >> 16);
	case 7:  *c++ = (unsigned char) (ifp->mfn >> 8);
	case 6:  *c++ = (unsigned char)  ifp->mfn;
	case 5:  *c++ = (unsigned char) (ifp->tag >> 8);
	case 4:  *c++ = (unsigned char)  ifp->tag;
	case 3:  *c++ = (unsigned char)  ifp->occ;
	case 2:  *c++ = (unsigned char) (ifp->pos >> 8);
	case 1:  *c++ = (unsigned char)  ifp->pos;
	case 0:  ;
	}
}	/* mkval */


static Hit *rdval ( Hit *ifp, unsigned char *c, unsigned vsz )
{
	ifp->mfn = ifp->tag = ifp->occ = ifp->pos = ifp->dbn = 0;
	switch ( vsz ) {
	default: c += vsz-11;
	case 11: ifp->dbn  = (unsigned short)*c++ << 8;
	case 10: ifp->dbn |=                 *c++;
	case 9:  ifp->mfn  = (unsigned  int)*c++ << 24;
	case 8:  ifp->mfn |= (unsigned  int)*c++ << 16;
	case 7:  ifp->mfn |= (unsigned  int)*c++ << 8;
	case 6:  ifp->mfn |=                 *c++;
	case 5:  ifp->tag  = (unsigned short)*c++ << 8;
	case 4:  ifp->tag |=                 *c++;
	case 3:  ifp->occ  =                 *c++;
	case 2:  ifp->pos  = (unsigned short)*c++ << 8;
	case 1:  ifp->pos |=                 *c++;
	case 0:  ;
	}
	return ifp;
}	/* rdval */


static char *prval ( PrVal buf, unsigned char *c, unsigned vsz )
{
	char *d = buf;
	Hit h;
	if ( ! vsz ) {
		buf[0] = '#';
		buf[1] = 0;
		return buf;
	}
	if ( 23 < vsz )
		vsz = 23;
	if ( 11 < vsz ) {
		while ( 11 < vsz-- ) {
			*d++ = luti_hex[ *c >> 4 ];
			*d++ = luti_hex[ *c++ & 0xf ];
		}
		*d++ = '.';
	}
	rdval( &h, c, vsz );
	d += u2a( d, h.dbn ); *d++ = '.';
	d += u2a( d, h.mfn ); *d++ = '.';
	d += u2a( d, h.tag ); *d++ = '.';
	d += u2a( d, h.occ ); *d++ = '.';
	d += u2a( d, h.pos ); *d++ = '.';
	*d = 0;
	return buf;
}	/* prval */

/* for PRVAL f->key->val */
#define PRVAL( buf, v ) prval( buf, v.byt, v.len )


/* ************************************************************
	block I/O & cache mgmt
*/

static void iniblk ( Block *b, Idx *bt, unsigned num, unsigned char lev )
{
	memset( b, 0, LBT_BOFF + bt->bsz + LBT_GUARD );
	b->num = num;
	b->typ = bt->typ;
	b->key = bt->key;
	b->col = bt->col;
	b->lev = lev;
	b->stk = bt->bsz;
}	/* iniblk */


static int chkblk ( Block *b, Idx *bt )
{
#if 0
	(void)b; (void)bt;
#else
	int i;
	unsigned short eod = LBT_DOFF + b->ent*sizeof(Dict), siz, pos, vln, key;
	unsigned end = bt->bsz, tot = 0;
	unsigned char *base = LBT_BASE(b);
#if LBT_GUARD
	if ( memcmp( guard, b->grd, LBT_GUARD ) )
		return log_msg( ERR_TRASH, "OOPS! initial guard frobbed on blk %d", b->num );
	if ( memcmp( guard, LBT_BASE(b)+bt->bsz, LBT_GUARD ) )
		return log_msg( ERR_TRASH, "OOPS! initial guard frobbed on blk %d", b->num );
#endif
	if ( b->num > bt->len || (b->num && b->nxt > bt->len) )
		return log_msg( ERR_TRASH,
			"bad num %d / nxt %d on lev %d", b->num, b->nxt, b->lev );
	if ( b->typ != bt->typ || b->key != bt->key || b->col != bt->col )
		return log_msg( ERR_TRASH,
			"bad sig 0x%02x,%d,%d in block %d want 0x%02x,%d,%d",
			b->typ, b->key, b->col, b->num, bt->typ, bt->key, bt->col );
	if ( b->ent > bt->bsz / LBT_ENTSIZE || b->stk > bt->bsz || b->stk < eod )
		return log_msg( ERR_TRASH,
			"bad ent/stk %d,%d in block %d eod %d sz %d",
			b->ent, b->stk, b->num, eod, bt->bsz );
	for ( i=0; i<b->ent; i++ ) {
		Dict *d = &b->dct[i];
		pos = LBT_POS( d );
		vln = LBT_VLN( d );
		key = d->kln;
		siz = key + (b->lev ? vln+4u : vln * bt->vsz);
		if ( pos < b->stk || end != (unsigned)pos + siz )
			return log_msg( ERR_TRASH,
				"bad pos/vln/key %d/%d/%d in entry %d of block %d"
				" stack %d sz %d end %d expected %d",
				pos, vln, key, i, b->num, b->stk, siz, pos+siz, end );
		if ( b->lev ) {
			unsigned lnk;
			if ( vln && vln != +bt->vsz )
				return log_msg( ERR_TRASH,
					"bad vln %d in entry %d of block %d vsz %d",
					vln, i, b->num, bt->vsz );
			lnk = GETINT( base + pos + key + vln );
			if ( lnk >= bt->len )
				return log_msg( ERR_TRASH,
					"bad link %u in entry %d of block %d",
					lnk, i, b->num );
		}
#if 1 && ! defined( NDEBUG )
		if ( i ) {
			unsigned short op = LBT_POS( d-1 ), okl = d[-1].kln;
			int mc = memcmp( base+pos, base+op, key<okl ? key : okl ), j;
			if ( 0 < mc || (!mc && 0 < (mc = (int)key - (int)okl)) )
				goto ok;
			j = 0;
			if ( !mc && b->lev && vln ) { /* compare values */
				unsigned ovln = d[-1].vln;
				if ( ! ovln || 0 < (j = memcmp( base+pos+key, base+op+okl, vln )) )
					goto ok;
			}
			return log_msg( ERR_TRASH,
				"bad key order entry %d of block %d lev %d"
				" mc %d j %d key %d okl %d vln %d",
				i, b->num, b->lev, mc, j, key, okl, vln
			);
		}
	ok:
#endif
		tot += siz;
		end = pos;
	}
	if ( tot > bt->bsz - b->stk )
		return log_msg( ERR_TRASH,
			"bad total entry size %d in block %d stack %d",
			tot, b->num, b->stk );
#ifndef NDEBUG
	if ( LOG_DO( LOG_TRACE ) ) {
		log_msg( LOG_TRACE, 
			"block %d lev %d sig 0x%02x,%d,%d ent %d eod %d stk %d",
			b->num, b->lev, b->typ, b->key, b->col, b->ent, eod, b->stk
		);
		for ( i=0; i<2; i++ ) {
			int j = i ? b->ent-1 : 0;
			Dict *d = &b->dct[j];
			if ( i >= b->ent ) /* if b->ent is 0 or 1, log no or one key */
				break;
			pos = LBT_POS( d );
			vln = LBT_VLN( d );
			key = d->kln;
			end = pos + b->lev ? vln+4u : vln * bt->vsz;
			/* start of value */
			if ( b->lev )
				log_msg( LOG_TRACE, "key %d '%.*s'/%d -> %d",
					j, key, base+pos, vln, GETINT(base+pos+key+vln) );
			else {
				Hit h;
				rdval( &h, base+pos+key, bt->vsz );
				log_msg( LOG_TRACE, "key %d '%.*s'[%d] = %d[%d,%d,%d]",
					j, key, base+pos, vln, h.mfn, h.tag, h.occ, h.pos );
			}
		}
	}
#endif
#endif
	return 0;
}	/* chkblk */


static Block *addchunk ( Idx *bt, unsigned cnt )
{
	unsigned ibsz = LBT_BOFF + bt->bsz + LBT_GUARD; /* internal block size */
	/* increase cnt so it's > 0 and add one extra to return */
	char *mem = (char *)mAlloc( sizeof(Chunk) + (1 + ++cnt)*ibsz );
	Chunk *chk = (Chunk *)mem;
	if ( ! mem )
		return 0;
	/* chain the chunk */
	chk->nxt = bt->mem;
	bt->mem = chk;
	mem += sizeof(Chunk);
	bt->clen += cnt;
	/* add cnt blocks to level 0 lru list */
	if ( ! bt->mru[0] ) /* level 0 cache is empty */
		bt->mru[0] = (Block *)mem;
	for ( ; cnt--; mem += ibsz ) {
		Block *b = (Block *)mem;
		iniblk( b, bt, 0 , 0 ); /* initialise empty block */
		b->nru = bt->lru[0];
		bt->lru[0] = b;
	}
	iniblk( (Block *)mem, bt, 0 , 0 );
	return (Block *)mem;
}	/* addchunk */


static int readblk ( Block *b, Idx *bt, unsigned num )
{
	int got = lio_pread( &bt->fd, LBT_BASE(b), bt->bsz, num*bt->bsz );
	log_msg( LOG_VERBOSE, "reading block %u got %u", num, got );
	if ( bt->bsz != (unsigned)got )
		return log_msg( LOG_SYSERR, "\twhen reading bt block %d", num );
	if ( b->num != num )
		return log_msg( ERR_TRASH, "bad num %d in block %d", b->num, num );
	return chkblk( b, bt );
} /* readblk */


static void unlock ( Block *b )
{
	b->lck = 0;
	if ( b->wlk )
		(void)LIO_WAKE( b->num );
}

#define ULK( b ) do { \
	if ( (b)->lck ) { (b)->lck = 0; if ( b->wlk ) unlock( b ); } \
	} while (0)


static int writeblk ( Block *b, Idx *bt, int ulk )
{
	int got;
	if ( chkblk( b, bt ) ) /* don't write no shize */
		return log_msg( ERR_TRASH, "\tbefore writing bt block %d", b->num );
	got = lio_pwrite( &bt->fd, LBT_BASE(b), bt->bsz, b->num*bt->bsz );
	log_msg( LOG_VERBOSE, "writing block %u got %u", b->num, got );
	if ( ulk )
		ULK( b );
	if ( bt->bsz != (unsigned)got )
		return log_msg( LOG_SYSERR, "\twhen writing bt block %d", b->num );
	return 0;
} /* writeblk */

#define WRULK( b, bt ) writeblk( b, bt, 1 )


#define NEWBLK( bt, lev, lock ) getblk( bt, bt->len, lev, lock )
/**
	get block #num.
	1) locate block #num in cache.
	2) if it is found:
		if it is READING, or we want to lock and it is locked,
		incr # waiters, wait for it.
	3) if it is not found:
		locate a free cache entry, extending the cache if necessary,
		set the num, set the READING flag, go read it.
		after reading, if there are waiters, wake them up.
		since we are the first to get a hold on the block,
		we may write lock it, if we want.
	4) return the block.
*/
static Block *getblk ( Idx *bt, unsigned num,
	unsigned char lev, lbt_get lock )
{
	Block *b;
	stat.get++; /* dirty */
	if ( !(stat.get & 0x3fff) ) { /* every 16K */
		static unsigned lmiss;
		log_msg( LOG_INFO,
			"searched %i keys in %u blks cache %d %d%%"
			" depth %u cmp/get/miss %u/%u/%u %d%% (%d %d%%)",
			stat.key, bt->len, bt->clen, bt->clen*100 / bt->len,
			bt->root->lev, stat.cmp, stat.get,
			stat.miss, stat.miss*100 / stat.get,
			(stat.miss - lmiss), (stat.miss - lmiss)*100 / 0x4000
		);
		lmiss = stat.miss;
	}
startover: /* only used on absurd congestion */
	b = 0;
	LOG_DBG( LOG_DEBUG, "get bt block %u %u %d", num, lev, lock );
	if ( ! num )
		b = bt->root;
	else if ( num < bt->len ) {
		b = bt->hash[ num % bt->hlen ];
		while ( b && num != b->num )
			b = b->nhs;
	}
	if ( b )
		while ( b->ird || (lock && b->lck) ) {
			int wrd = b->ird;
			int wlk = lock && b->lck;
			LOG_DBG( LOG_VERBOSE, "lck on block %u %u r%d l%d",
				num, lev, b->ird, b->lck );
			if ( LBT_EX <= lock ) {
				log_msg( LOG_ERROR, "concurrent mod on EX lock" );
				return 0;
			}
			if ( ! lio_lock ) {
				log_msg( LOG_FATAL, "bad MT environment: concurrent mod, no lock" );
				return 0;
			}
			if ( wrd && 0xff != b->wrd )
				b->wrd++;
			if ( wlk && 0xff != b->wlk )
				b->wlk++;
			(void)LIO_WAIT( num ); /* zzzzz .... sleep tight */
			/* some hours later */
			if ( num != b->num ) {
				log_msg( LOG_WARN, "heavy congestion: block %d has gone", num );
				goto startover;
			}
			if ( wrd && b->wrd )
				b->wrd--;
			if ( wlk && b->wlk )
				b->wlk--;
		}
	else { /* not found or new block */
		/* find free block */
		int i = 0, llev, err = 0;
		stat.miss++; /* dirty */
		LOG_DBG( LOG_DEBUG, "bt block %u %u not found", num, lev );
		for ( ; i < (int)LBT_LRULEV; i++ ) {
			/* check lru list for level i */
			Block **bp = &bt->lru[i];
			while ( (b = *bp) && LBT_BUSY(b) )
				bp = &b->nru;
			if ( ! b ) /* try next level */
				continue;
			*bp = b->nru; /* dequeue */
			if ( bt->mru[i] == b ) /* was the tail */
				bt->mru[i] = 0;
			break;
		}
		if ( ! b ) {
			log_msg( LOG_WARN, "no free blocks in cache" );
			if ( !(b = addchunk( bt, 64 )) )
				return 0;
			/* TODO: should we also extend the hash ? */
		} else if ( b->num ) { /* got block -- should be hashed somewhere */
			Block **bp = &bt->hash[ b->num % bt->hlen ];
			while ( *bp && *bp != b )
				bp = &(*bp)->nhs;
			if ( *bp == b )
				*bp = b->nhs; /* dehash */
			/* else ... hmmm should have found it */
		}
		assert( ! LBT_BUSY( b ) );
		iniblk( b, bt, num, lev );
		/* hash it, so waiters can find it */
		b->nhs = bt->hash[ b->num % bt->hlen ];
		bt->hash[ b->num % bt->hlen ] = b;
		if ( num == bt->len ) { /* extend */
			b->lev = lev;
			bt->len++;
			/* TODO: should be configurable */
			if ( bt->len > bt->clen << 5 && bt->clen < 0x4000 )
				addchunk( bt, bt->clen / 4 );
		} else {
			b->ird = 1;
			(void)LIO_RELE();
			err = readblk( b, bt, num );
			(void)LIO_LOCK();
			b->ird = 0;
			if ( ! err && lev != b->lev )
				err = log_msg( ERR_TRASH, "bad lev %d in block %d want %d",
					b->lev, b->num, lev );
			if ( err ) {
				Block **bp = &bt->hash[ num % bt->hlen ];
				while ( *bp && *bp != b )
					bp = &(*bp)->nhs;
				if ( *bp == b )
					*bp = b->nhs; /* dehash */
				b->lev = 0; /* add to level 0 lru list */
				b->num = 0; /* tell others that it isn't the expected block */
			}
		}
		/* add to it's levels lru list */
		llev = LBT_LRULEV > b->lev ? b->lev : LBT_LRULEV-1;
		if ( bt->mru[ llev ] )
			bt->mru[ llev ]->nru = b;
		else 
			bt->lru[ llev ] = b;
		(bt->mru[ llev ] = b)->nru = 0;
		if ( b->wrd )
			(void)LIO_WAKE( num );
		if ( err ) {
			b->wrd = b->wlk = 0; /* mark block unused */
			return 0;
		}
	}	/* got free block */
	if ( LBT_WR == lock ) {
		if ( b->lck ) {
			log_msg( LOG_FATAL, "bt consi: unexpected lock on %u", num );
			return 0;
		}
		b->lck = 1;
	}
	return b;
}	/* getblk */


/** binsearch position for key.
	position is either the index of the matching entry (0..b->ent-1)
	or the index (0..b->ent) where key should be inserted.
	@return position for key (0..b->ent), |ifyed with LBT_MATCH, if found
*/
static unsigned short keypos (
#if LBT_USECMP
	Idx *bt,
#define KEYPOS( bt, b, key ) keypos( bt, b, key )
#else
#define KEYPOS( bt, b, key ) keypos( b, key )
#endif
	Block *b, Key *key )
{
	unsigned char *base = LBT_BASE( b );
	unsigned char *kbt = key->byt;
	unsigned char  kln = key->len;
	unsigned short l = 0, r = b->ent;
	Dict          *d = b->dct;
	/* binsearch invariant:
		key is < all entries i >= r (we do know relation at r)
		key is > all entries i < l (don't know relation at l)
	*/
	while ( l < r ) { /* invariant: l <= keypos < r */
		unsigned short i = (l + r) >> 1; /* l <= i < r */
		unsigned short pos = LBT_POS(&d[i]);
		unsigned char cln = d[i].kln;
		int cmp =
#if LBT_USECMP
			bt->cmp ? bt->cmp( kbt, base+pos, kln<cln ? kln : cln ) :
#endif
			memcmp( kbt, base+pos, kln<cln ? kln : cln );
		stat.cmp++; /* dirty */
		if ( ! cmp && !(cmp = (int)kln - cln) ) {
			/* match on main key -- what about the value ? */
			if ( ! b->lev /* ignore on leaves -- no duplicate keys here */
				|| (!d[i].vln && !key->val.len) /* no value */
			)
				return LBT_MATCH | i;
			/* so we're on an inner node */
			if ( ! d[i].vln ) /* common case */
				cmp = 1;
			else if ( ! key->val.len )
				cmp = -1;
			else {
				unsigned char vln = LBT_VLN(&d[i]);
				int c = key->val.len < d[i].vln ? key->val.len : vln;
				if ( !(cmp = memcmp( key->val.byt, base+pos+cln, c ))
					&& !(cmp = (int) key->val.len - vln)
				)
				return LBT_MATCH | i;
			}
		}
		if ( cmp > 0 )
			l = i+1;
		else
			r = i;
	}
	return l; /* == r */
}	/* keypos */


/** find position for (leaf) val.
	search v in array b of n values of size sz
	@return position for value (0..n), |ifyed with LBT_MATCH, if found
*/
static unsigned short valpos ( unsigned char *b,
	unsigned short n, unsigned char sz, unsigned char *v )
{
	unsigned short l = 0, r = n;
	/* binsearch invariant:
		v is < all values i >= r (we do know relation at r)
		v is > all values i < l (don't know relation at l)
	*/
	while ( l < r ) { /* invariant: l <= valpos < r */
		unsigned short i = (l + r) >> 1; /* l <= i < r */
		int cmp = memcmp( v, b+i*sz, sz );
		if ( ! cmp )
			return LBT_MATCH | i;
		if ( cmp > 0 )
			l = i+1;
		else
			r = i;
	}
	return l; /* == r */
}	/* valpos */


#ifndef NDEBUG
static void TRACEKEY ( unsigned lev, FindKey *f )
{
	int i;
	unsigned n = 0;
	Hit h;
	PrVal pv;

	if ( LOG_DONT( lev ) )
		return;
	log_msg( lev, "key '%.*s' vln %d val %s",
		f->key->len, f->key->byt, f->key->val.len,
		PRVAL( pv, f->key->val ) );
	if ( f->key->val.len ) {
		rdval( &h, f->key->val.byt, f->key->val.len );
		log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d", 
			h.dbn, h.mfn, h.tag, h.occ, h.pos );
	}
	for ( i=LBT_LEVELS; i--; ) {
		Block *b = f->b[i];
		unsigned on = n, e;
		Dict *d;
		if ( ! b  )
			continue;
		d = b->dct + (e = (LBT_ENTMASK & f->e[i]));
		if ( i )
			n = GETINT( LBT_BASE(b)+d->pos+d->kln+d->vln );
		log_msg( lev, "lev %d n %d e 0x%x = %d %c '%.*s' val %d %s -> %d%s",
			i, f->n[i], f->e[i], e, (LBT_MATCH & f->e[i]) ? 'M' : '-',
			d->kln, LBT_BASE(b)+d->pos, d->vln,
			prval( pv, LBT_BASE(b)+d->pos+d->kln, d->vln ),
			n, on == b->num ? "" : " LY!"
		);
		if ( d->vln ) {
			rdval( &h, LBT_BASE(b)+d->pos+d->kln, d->vln );
			log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d", 
				h.dbn, h.mfn, h.tag, h.occ, h.pos );
		}
		if ( e ) {
			d--;
			log_msg( lev, "\tleft '%.*s' val %d %s",
				d->kln, LBT_BASE(b)+d->pos, d->vln,
				prval( pv, LBT_BASE(b)+d->pos+d->kln, d->vln )
			);
			if ( d->vln ) {
				rdval( &h, LBT_BASE(b)+d->pos+d->kln, d->vln );
				log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d", 
					h.dbn, h.mfn, h.tag, h.occ, h.pos );
			}
		}
	}
}	/* TRACEKEY */
#else
#define TRACEKEY( l, f ) (void)(f)
#endif


static int findkey ( FindKey *f, lbt_get lock )
{
	int i;
	Block *b = f->bt->root;
	unsigned char *pkey = 0, pkln = 0, pvln = 0;
	if ( b->lev >= LBT_LEVELS )
		return log_msg( ERR_TRASH, "tree is too deep: %u", b->lev );
	stat.key++;
	for ( i=LBT_LEVELS; i--; ) { /* paranoia */
		f->b[i] = 0;
		f->n[i] = f->e[i] = 0;
	}
	for ( i=b->lev;; ) {
		unsigned short e;
		unsigned  n;
		Dict *d;
#ifndef NDEBUG
		chkblk( b, f->bt );
#endif
		f->n[i] = (f->b[i] = b)->num;
		f->e[i] = e = KEYPOS( f->bt, b, f->key );
		LOG_DBG( LOG_DEBUG, "keypos %x", e );
#ifndef NDEBUG
		if ( pkey && b->ent ) {
			/* compare parent key against our first key
				pkey must be equal for inner node and not greater for leave
			*/
			int cmp; /* this - pkey >= 0 */
			unsigned char *ent, ln;
			d = b->dct;
			ent = LBT_BASE(b)+LBT_POS(d);
			ln = pkln < d->kln ? pkln : d->kln;
			cmp = ln ? memcmp( ent, pkey, ln ) : 0;
			if ( ! cmp )
				cmp = (int)d->kln - (int)pkln;
			if ( ! cmp ) {
				if ( ! d->vln || ! pvln )
					cmp = (int)d->vln - (int)pvln;
				else
					cmp = memcmp( ent+d->kln, pkey+pkln, f->bt->vsz );
			}
			if ( cmp && (i || 0 > cmp) ) {
				Hit h;
				PrVal pv;
				TRACEKEY( ERR_TRASH, f );
				if ( d->vln ) {
					rdval( &h, ent+d->kln, d->vln );
					log_msg( ERR_TRASH, "\tfirst val db %d mfn %d tag %d occ %d pos %d", 
						h.dbn, h.mfn, h.tag, h.occ, h.pos );
				}
				return log_msg( ERR_TRASH, "\tbad first '%.*s' val %d %s",
					d->kln, ent, d->vln, prval( pv, ent+d->kln, d->vln ) );
			}
		}
#endif
		/*
			here goes the Lehmann/Yao race detection:
			If we ran on right boundary, follow right link
		*/
		while ( !(LBT_MATCH & e) && b->ent == e && b->nxt && b->num ) {
			Block *s;
			unsigned se;
			b->lck = LBT_WR; /* hold b while checking s */
			s = getblk( f->bt, b->nxt, i, i ? LBT_RD : lock );
			if ( i || ! lock )
				ULK( b );
			else
				b->lck = lock;
			se = KEYPOS( f->bt, s, f->key );
			if ( !se ) { /* lower than first */
				ULK( s );
				break;
			}
			LOG_DBG( LOG_DEBUG, "LYao %d -> %d lev %d for '%.*s' got 0x%x",
				b->num, s->num, i, f->key->len, f->key->byt, se );
			/* trade b for s */
			ULK( b );
			f->n[i] = (f->b[i] = b = s)->num;
			e = se;
		}
		f->e[i] = e;
		if ( ! i )
			return 0;
		if ( !(LBT_MATCH & e) ) {
			if ( e )
				f->e[i] = --e;
			else {
				TRACEKEY( ERR_TRASH, f );
				return log_msg( ERR_IDIOT,
					"ran on left bound of block %u ent %u",
					b->num, b->ent );
			}
		}
		d = b->dct + (LBT_ENTMASK & e);
		/* direct access ok in inner node */
		pkey = LBT_BASE(b)+d->pos;
		pkln = d->kln;
		pvln = d->vln;
		n = GETINT( pkey+pkln+pvln );
		i--;
		b = getblk( f->bt, n, i, i ? LBT_RD : lock );
	}
	return -1; /* go away */
}	/* findkey */


static int mktree ( Idx *bt )
{
	Batch *ba = bt->bat;
	unsigned char lev = 0; /* the level to be indexed */
	unsigned left = 1; /* block on child level */
	/* stats */
	unsigned size = bt->len * bt->bsz;
	unsigned sval = ba->vals * bt->vsz;
	unsigned sove = bt->len * LBT_DOFF + sizeof(Dict)*(ba->keys+ba->span);
	unsigned iblk = 0;
	unsigned ikey = 0;
	unsigned ikeyl = 0;
	unsigned ipfx = 0;
	unsigned ispn = 0;

	log_msg( LOG_INFO,
		"end batch:\n"
		"\t#blks %u total len %u free %u (%.1f%%) overhead %u (%.1f%%)\n"
		"\t#keys %u total len %u (%.1f%%) avg %.1f max %u\n"
		"\t#vals %u total len %u (%.1f%%) avg %.1f max %u spans %u",
		bt->len, size, ba->free, ba->free*100./size, sove, sove*100./size,
		ba->keys, ba->keyl, ba->keyl*100./size, (float)ba->keyl/ba->keys, ba->maxl,
		ba->vals, sval, sval*100./size, (float)ba->vals/ba->keys, ba->maxv, ba->span
	);

	/* build tree */
	for (;;) { /* levels */
		unsigned  len = 0; /* current is n'th block on level */
		Block        *b = 0;
		unsigned  have = 0; /* room in current block b */
		unsigned char*base = 0; /* of current block b */
		Dict         *d = 0; /* of current block b */
		unsigned next = left; /* child */

		left = bt->len;
		/* get child c and create child pointer in block b */
		while ( next ) {
			unsigned char  vln = 0;
			unsigned char *key;
			unsigned char  kln;
			unsigned  need;
			Block *c = getblk( bt, next, lev, LBT_EX );
			if ( ! c )
				return log_msg( ERR_NOMEM, "no child block" );
			next = c->nxt;
			if ( ! c->ent ) /* OOPS empty node */
				continue;
			key = LBT_BASE(c)+LBT_POS(c->dct);
			kln = c->dct[0].kln;
			if ( ! b ) /* the leftmost child on this level */
				kln = 0; /* always 0 key on left */
			else if ( lev )
				vln = LBT_VLN(c->dct);
			else if ( ba->key.len == kln /* compare key to last of prev. */
				&& !memcmp( ba->key.byt, key, kln ) /* which was saved in ba->key */
			) { /* key spans blocks */
				LOG_DBG( LOG_DEBUG, "found span key '%.*s' child %u",
					kln, key, c->num );
				vln = bt->vsz; /* qualify with value */
			} else {
#if 1
				unsigned char x = 0;
				/* find prefix key */
				if ( kln > ba->key.len )
					kln = ba->key.len;
				while ( x < kln && key[x] == ba->key.byt[x] )
					x++;
				if ( x < kln ) /* they differ at [x] */
					kln = x+1;
				else /* equal on the shorter length */
					kln++;
				if ( kln > c->dct[0].kln )
					return log_msg( ERR_TRASH,
						"bad key '%.*s' on level %u matching previous of length %u",
						c->dct[0].kln, key, lev, ba->key.len );
				LOG_DBG( LOG_DEBUG, "prefix on key '%.*s' child %u save %u",
					kln, key, c->num, c->dct[0].kln - kln );
				ipfx += c->dct[0].kln - kln;
#endif
			}
			if ( vln )
				ispn++;
			ikeyl += kln;
			need = sizeof(Dict) + kln + vln + 4;
			if ( need > have ) {
				if ( b ) {
					log_msg( LOG_VERBOSE,
						"level %u block %u is full %u ent stack %u",
						lev, b->num, b->ent, b->stk );
					b->nxt = bt->len; /* link */
					b->lck = 0;
					if ( WRULK( b, bt ) ) goto nowrite;
					len++;
				}
				c->lck = LBT_EX; /* paranoia */
				b = NEWBLK( bt, lev+1, LBT_EX );
				c->lck = 0;
				if ( ! b )
					return log_msg( ERR_NOMEM, "no free block" );
				if ( b->ent || b->stk != bt->bsz )
					return log_msg( ERR_TRASH,
						"new block %u is not empty: ent %u stk %u",
						b->num, b->ent, b->stk );
				base = LBT_BASE( b );
				b->lck = LBT_EX; /* lock */
				have = bt->bsz - LBT_DOFF; /* - ba->let; TODO: should we ??? */
				d = b->dct;
				len++;
				iblk++;
			}
			SETINT( base+(b->stk -= 4), c->num );
			d->kln = kln;
			d->vln = vln;
			if ( kln += vln ) /* key + consecutive qualifying value */
				memcpy( base+(b->stk -= kln), key, kln );
			d->pos = b->stk;
			d++;
			b->ent++;
			ikey++;
			have -= need;
			/* save the last dance for me */
			memcpy( ba->key.byt, LBT_BASE(c)+LBT_POS(&c->dct[c->ent-1]),
				ba->key.len = c->dct[c->ent-1].kln );
		}
		if ( !len )
			return log_msg( ERR_TRASH, "level %u was empty", lev );
		/* close level */
		log_msg( LOG_INFO, "closing level %u with %u blocks %u..%u",
			lev, len, left, b->num );
		b->lck = 0;
		if ( 1 == len ) {
			memcpy( bt->root, b, LBT_BOFF + bt->bsz );
			b = bt->root;
			b->num = 0;
		}
		if ( WRULK( b, bt ) ) goto nowrite;
		lev++;
		if ( 2 > len )
			break;
	}
	log_msg( LOG_INFO,
		"\tused %u inner blocks %u keys %u bytes avg %.1f\n"
		"\t%u #spans (%.1f%%) %u pfx (- %.1f%%)",
		iblk, ikey, ikeyl, (float)ikeyl/ikey,
		ispn, ispn*100.*bt->vsz/ikeyl, ipfx, ipfx*100./(ipfx+ikeyl) );

	mFree( bt->bat );
	bt->bat = 0;
	return 0;
nowrite:
	return log_msg( LOG_SYSERR, "\twhen writing batch block" );
}	/* mktree */



/* ************************************************************
	package data
*/



/* ************************************************************
	package functions
*/

int lbt_batch ( Idx *bt, unsigned char pctfree )
{
	bt->bat = (Batch*)mAlloc( sizeof(Batch) );
	if ( ! bt->bat )
		return log_msg( ERR_NOMEM, "\twhen getting batch" );
	memset( bt->bat, 0, sizeof(Batch) );
	if ( pctfree > 50 )
		pctfree = 50;
	bt->bat->let = pctfree * bt->bsz / 100;
	lio_trunc( &bt->fd, 0 );
	iniblk( bt->root, bt, 0, 0 );
	/* TODO: kill hash */
	bt->len = 1;
	bt->bat->cur = NEWBLK( bt, 0, LBT_EX );
	return 0;
}	/* lbt_batch */


int lbt_batchval ( Idx *bt, Key *key )
{
	int cmp = 0;
	Batch *ba = bt->bat;
	if ( key != &ba->key ) {
		int l = key->len < ba->key.len ? key->len : ba->key.len; /* min */
		if ( ! (cmp = memcmp(key->byt, ba->key.byt, l)) )
			cmp = (int)key->len - (int)ba->key.len;
		if ( 0 > cmp )
			return log_msg( LOG_ERROR, "batch key '%.*s' < last '%.*s'",
				key->len, key->byt, ba->key.len, ba->key.byt );
	}
	/* flush ? */
	while ( ! key->val.len /* end */
		|| LBT_ENTMAX == ba->got /* full */
		|| cmp /* next key */
	) {
		Block *b = ba->cur;
		unsigned have = b->stk - LBT_DOFF - b->ent*sizeof(Dict) - ba->let;
		unsigned need = sizeof(Dict) + ba->key.len;
		unsigned want = need + ba->got * bt->vsz;
		unsigned take = ba->got;
		if ( have < want ) { /* you can't always get ... */
			if ( have <= need )
				take = 0;
			else {
				take = (have - need) / bt->vsz;
				if ( 256 > take * bt->vsz )
					take = 0;
			}
		}
#if 0 /* prefix key turbo ??? -- doesn't seem to work all too well */
		if ( have < (bt->bsz >> 1) ) { /* ready to waste half a block ? */
			Dict *d = &b->dct[ b->ent-1 ];
			unsigned char *last = LBT_BASE(b)+LBT_POS(d);
			if ( ba->key.byt[0] != last[0]
				|| ba->key.byt[1] != last[1]
				|| ba->key.byt[2] != last[2]
			)
				take = 0;
		}
#endif
		LOG_DBG( LOG_DEBUG, "batch '%.*s' got %u need %u want %u have %u take %u",
			ba->key.len, ba->key.byt, ba->got, need, want, have, take );
		/* we have enough room in b for take entries */
		if ( take ) {
			unsigned char *base = LBT_BASE( b );
			unsigned char *s = base + b->stk;
			int i = take;
			Dict *d = &b->dct[ b->ent++ ];
			while ( i-- )
				memcpy( s -= bt->vsz, ba->val[i].byt, bt->vsz );
			memcpy( s -= ba->key.len, ba->key.byt, ba->key.len );
			b->stk = (unsigned short) (s - base);
			LBT_PV( d, b->stk, take );
			d->kln = ba->key.len;
			if ( ba->got > take )
				memcpy( ba->val, ba->val+take, sizeof(ba->val[0])*(ba->got - take) );
			ba->got -= take;
		}
		if ( ! ba->got )
			break;
		if ( take ) /* key spans blocks */
			ba->span++;
		ba->free += b->stk - LBT_DOFF - b->ent*sizeof(Dict); /* wasted */
		b->nxt = bt->len; /* link */
		LOG_DBG( LOG_DEBUG, "batch: %u entries left, getting block %u",
			ba->got, bt->len );
		if ( WRULK( b, bt ) )
			return log_msg( LOG_SYSERR, "\twhen writing batch block" );
		/* so couldn't flush all, thus we need another block */
		b = ba->cur = NEWBLK( bt, 0, LBT_EX );
		/* will continue on end or keychange */
		if ( b->ent || b->stk < bt->bsz )
			return log_msg( ERR_TRASH, "new block %u is not empty: ent %u stk %u",
				b->num, b->ent, b->stk );
		LOG_DBG( LOG_VERBOSE, "new block %u has %u ent %u stk, will keep %u",
			b->num, b->ent, b->stk, ba->let );
	}
	if ( cmp || ! key->val.len ) /* done with key: stats */
		if ( ba->maxv < ba->tot )
			ba->maxv = ba->tot;
	if ( key->val.len ) {
		if ( cmp ) {	/* set new key */
			if ( ba->got )
				return log_msg( ERR_IDIOT, "batch not empty on new key" );
			ba->tot = 0;
			memcpy( ba->key.byt, key->byt, ba->key.len = key->len );
			ba->keys++;
			ba->keyl += key->len;
			if ( ba->maxl < key->len )
				ba->maxl = key->len;
		}
		/* add new val */
		/* LOG_HEX( key->val.byt, bt->vsz ); */
		memcpy( ba->val[ ba->got++ ].byt, key->val.byt, bt->vsz );
		ba->vals++;
		ba->tot++;
	} else {
		WRULK( ba->cur, bt );
		mktree( bt );
	}
	
	return 0;
}	/* lbt_batchval */


/* TODO: this function is an awful mess and needs complete rewrite, KR
*/
int lbt_add ( Idx *bt, Key *key )
{
	int ret = 0;
	unsigned short e, i = 0;
	Block *b, *s = 0; /* mainblock, sibling */
	Dict *d;
	unsigned char *base, *sbase, *src, *v, *stk;
	FindKey f;
	int hadkey, split = 0, insibling = 0;
	unsigned need;
	PrVal pv;

	f.bt  = bt;
	f.key = key;
	if ( bt->key && key->len > bt->key )
		key->len = bt->key;
	if ( findkey( &f, LBT_WR ) ) {
		if ( f.b[0] )
			ULK( f.b[0] );
		b = 0;
		LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
	}
	e = (LBT_ENTMASK & f.e[0]);
	b = f.b[0];
	base = LBT_BASE(b);
	d = b->dct + e;
	if ( ! (hadkey = LBT_MATCH & f.e[0]) ) /* new key */
		need = key->len + sizeof(Dict) + bt->vsz;
	else {
		unsigned vln = LBT_VLN(d);
		v = base+LBT_POS(d)+d->kln;
		if ( 1 == vln ) { /* check stopword -- value entirely zeroed */
			int l = bt->vsz;
			unsigned char *C = v;
			while ( l-- && !*C++ ) /* huh huh, he said "C++" */
				;
			if ( -1 == l ) {
				LOG_DBG( LOG_VERBOSE, "key '%.*s' is a stopword", key->len, key->byt );
				goto done;
			}
		}
		i = valpos( v, vln, bt->vsz, key->val.byt );
		if ( LBT_MATCH & i ) {
			LOG_DBG( LOG_DEBUG,
				"adding key '%.*s'/%s: already there pos %u",
				key->len, key->byt, PRVAL( pv, key->val ), i & ~LBT_MATCH );
			goto done; /* value already there */
		}
		need = bt->vsz;
	}
	LOG_DBG( LOG_DEBUG,
		"adding key '%.*s'/%s in block %d had %d ent %d stk %d need %d",
		key->len, key->byt, PRVAL( pv, key->val ),
		b->num, hadkey, b->ent, b->stk, need );

	if ( need > b->stk - LBT_DOFF - b->ent*sizeof(Dict) ) {
		/* now that's bad: not enough space
			create new block s, move last entries there
			starting from the end, entries (including the new one) are collected,
			until their combined size reaches more than half a block.
			If the combined size then is more than 2/3 a block,
			and the last entry alone takes more than 1/3 a block, it is split.
			TODO: check existing sibling for free space
		*/
		unsigned half  = (bt->bsz - LBT_DOFF) >> 1;
		unsigned tot   = 0;
		unsigned mv = b->ent;

		LOG_DBG( LOG_DEBUG, "splitting block %d need %d have %d (stk %d ent %d)",
			b->num, need, b->stk - LBT_DOFF - b->ent*sizeof(Dict), b->stk, b->ent );
		if ( need > (bt->bsz - LBT_DOFF) / 3 )
			LOG_OTO( done, (ERR_IDIOT, "OOPS! excessive need for %d bytes", need) );
		/* figure out split position */
		if ( !hadkey && b->ent == e ) {
			tot = need;
			insibling = !0;
		}
		d = b->dct+mv;
		while ( d--, mv-- ) {
			unsigned sz = sizeof(Dict)+d->kln+LBT_VLN(d)*bt->vsz;
			if ( half > (tot += sz) ) { /* sz fits */
				if ( e == mv && !hadkey ) {
					/*
						check size of new entry.
						In case of hadkey, we don't bother for the bt->vsz bytes.
					*/
					if ( half < (tot + need) ) {
						if ( (half - tot) > need/2 ) { /* more balanced if we move */
							tot += need;
							insibling = !0;
						}
						break;
					}
					tot += need;
					insibling = !0;
				}
				if ( mv )
					continue; /* else croak on ! mv below */
			}
			/* exceeded half -- maybe we move entry mv anyway */
			if ( tot > (bt->bsz - LBT_DOFF)*2/3 ) {
				if ( tot-sz < (bt->bsz - LBT_DOFF)/3 )
					split = !0;
				else { /* undo -- don't move this entry */
					tot -= sz;
					d++; mv++;
				}
			} else if ( ! mv )
				LOG_OTO( done, (ERR_IDIOT, "OOPS! didn't find end tot %d bytes", tot) );
			break;
		}	/* while mv-- */
		if ( mv < e || (mv == e && hadkey && !split) )
			insibling = !0;
		/* now mv is the first entry to move */
		s = NEWBLK( bt, 0, LBT_WR );
		LOG_DBG( LOG_DEBUG,
			"split: move %d of %d to new blk %d e %d tot %d insibling %d split %d",
			mv, b->ent, s->num, e, tot, insibling, split );
		s->nxt = b->nxt;
		b->nxt = s->num;
		sbase = LBT_BASE(s);
		if ( split ) {
			/* now total includes full size of entry mv.
				if we included a new entry, it might even exceed bsz.
				calculate sizes for both blocks w/o mv's values.
			*/
			unsigned esz = sizeof(Dict)+d->kln; /* entry's base cost */
			unsigned vln = LBT_VLN(d);
			unsigned keep, diff, mvval, kpval, mvsiz, pos = LBT_POS(d);
			/*
				from currently used space, substract tot (including all
				values and possibly new key "need"), but add entry base cost.
				that should be the size remaining if we where moving
				all of the values but keep an entry and key.
				maybe slightly biased if need is bt->vsz (the hadkey case).
			*/
			keep = bt->bsz - b->stk + b->ent*sizeof(Dict) + need - tot + esz;
			tot -= vln * bt->vsz; /* undo entries */
			diff = (keep > tot ? keep - tot : tot - keep) / bt->vsz;
			if ( diff > vln )
				LOG_OTO( done, (ERR_IDIOT,
					"OOPS! got diff %d vln %d entry %d tot %d keep %d",
					diff, vln, mv, tot, keep) );
			mvval = (vln - diff)/2;
			if ( keep > tot )
				mvval += diff;
			kpval = vln - mvval;
			if ( ! mvval || ! kpval ) {
				LOG_DBG( LOG_WARN, "bad mv/kpval %d/%d keep %d tot %d vln %d diff %d",
					mvval, kpval, keep, tot, vln, diff );
				if ( ! mvval )
					kpval = vln - (mvval = 1);
				else
					mvval = vln - (kpval = 1);
			}
			tot += mvsiz = mvval*bt->vsz;
			keep += kpval*bt->vsz;
			src = base+pos;
			LOG_DBG( LOG_DEBUG,
				"split: entry %d '%.*s' mv %d of %d values, i %d keep %d tot %d",
				mv, d->kln, src, mvval, vln, i, keep, tot );
			if ( keep > (bt->bsz-LBT_DOFF) || tot > (bt->bsz-LBT_DOFF) )
				LOG_OTO( done, (ERR_IDIOT,
					"OOPS! got diff %d vln %d mvval %d entry %d tot %d keep %d",
					diff, vln, mvval, mv, tot, keep) );
			/* SIGH -- do it */
			memcpy( sbase + (s->stk -= mvsiz),
				src+d->kln+kpval*bt->vsz, mvsiz );
			if ( e == mv && hadkey && i > kpval ) {
				insibling = !0;
				s->stk -= bt->vsz;
				i -= kpval;
				memmove( sbase+s->stk, sbase+s->stk+bt->vsz, i*bt->vsz );
				memcpy( sbase+s->stk+i*bt->vsz, key->val.byt, bt->vsz );
				mvval++;
			}
			memcpy( sbase + (s->stk -= d->kln), src, d->kln );
			LBT_PVK( s->dct, s->stk, mvval, d->kln );
			s->ent = 1;
			memmove( src + mvsiz, src, d->kln+kpval*bt->vsz );
			pos += mvsiz;
			LBT_PV( d, pos, kpval );
			d++; mv++;
		}
		/* now entries mv..b->ent are moved, mv > 0 */
		b->stk = LBT_POS(b->dct+mv-1); /* pre-adjust stack */
		if ( mv < e ) { /* move entries mv..e-1 */
			/* used stack space is from pos of entry e-1 to end of entry mv
				(might differ from pos of mv-1 due to split)
			*/
			unsigned from = LBT_POS(b->dct+e-1), nent = e-mv,
				sz = LBT_POS(b->dct+mv)+b->dct[mv].kln
					+LBT_VLN(b->dct+mv)*bt->vsz - from;
			int adj = (s->stk -= sz) - from; /* entry position adjustment */
			memcpy( sbase + s->stk, base+from, sz );
			memcpy( s->dct+s->ent, b->dct+mv, nent*sizeof(Dict) );
			d = s->dct + s->ent;
			s->ent += nent;
			for ( ; nent--; d++ )
				LBT_POSADD( d, adj );
		}
		if ( insibling && mv <= e ) { /* now mv or insert e, unless done on split */
			d = b->dct + e;
			if ( hadkey ) { /* mv existing entry e >= mv, add new val */
				/* i is valpos */
				unsigned vln = LBT_VLN(d);
				src = base+LBT_POS(d)+d->kln;
				if ( i < vln ) {
					unsigned sz = (vln - i)*bt->vsz;
					memcpy( sbase + (s->stk -= sz), src + i*bt->vsz, sz );
				}
				memcpy( sbase + (s->stk -= bt->vsz), key->val.byt, bt->vsz );
				if ( i ) {
					unsigned sz = i*bt->vsz;
					memcpy( sbase + (s->stk -= sz), src, sz );
				}
				memcpy( sbase + (s->stk -= d->kln), base+LBT_POS(d), d->kln );
				LBT_PVK( s->dct + s->ent, s->stk, vln+1, d->kln );
				e++; /* e is moved */
			} else {
				memcpy( sbase + (s->stk -= bt->vsz), key->val.byt, bt->vsz );
				memcpy( sbase + (s->stk -= key->len), key->byt, key->len );
				LBT_PVK( s->dct + s->ent, s->stk, 1, key->len );
			}
			s->ent++;
		} /* mv or insert e */
		if ( e < b->ent && mv < b->ent ) { /* move entries max(e,mv) to end */
			unsigned fst = mv < e ? e : mv;
			/* used stack space is from pos of entry b->ent-1 (the original b->stk)
				to end of entry fst (might differ from pos of fst-1 due to split)
			*/
			unsigned from = LBT_POS(b->dct+b->ent-1), nent = b->ent-fst,
				sz = LBT_POS(b->dct+fst)+b->dct[fst].kln
					+LBT_VLN(b->dct+fst)*bt->vsz - from;
			int adj = (s->stk -= sz) - from; /* entry position adjustment */
			LOG_DBG( LOG_DEBUG, "split: entries %d..%d sz %d from %d to %d",
				fst, b->ent-1, sz, from, s->stk );
			memcpy( sbase + s->stk, base+from, sz );
			memcpy( s->dct+s->ent, b->dct+fst, nent*sizeof(Dict) );
			d = s->dct + s->ent;
			s->ent += nent;
			for ( ; nent--; d++ ) {
#if !defined(NDEBUG)
				unsigned p = LBT_POS(d);
#endif
				LBT_POSADD( d, adj );
#if !defined(NDEBUG)
				LOG_DBG( LOG_DEBUG, "new ent %d p/v/k %d/%d/%d o %d",
					d - s->dct, LBT_POS(d), LBT_VLN(d), d->kln, p );
#endif
			}
		}
		b->ent = mv; /* kill entries */
		LOG_DBG( LOG_DEBUG,
			"split: blks %d/%d with %d/%d entries, s starts '%.*s'",
			b->num, s->num, b->ent, s->ent, s->dct[0].kln, sbase+LBT_POS(s->dct) );
	}	/* new sibling */

	if ( ! insibling ) {
		/* now we do have enough space */
		d = b->dct + e;
		stk = base + b->stk; /* bottom of data stack */
		if ( hadkey ) { /* insert value at i */
			unsigned char *val = base + LBT_POS(d)+d->kln+i*bt->vsz;
			need = bt->vsz;
#if 0
		LOG_DBG( LOG_DEBUG,
			"in entry %d '%.*s' val %d: memmove( %p %p %d) val %p need %d",
			e, d->kln, base+LBT_POS(d), i,
			stk-need, stk, (val-stk)+need, val, need );
#endif
			memmove( stk - need, stk, (val - stk)+need );
			memcpy( val - need, key->val.byt, bt->vsz );
			LBT_VLNADD( d, 1 );
		} else { /* insert key, value and dict entry */
			/* take dict and data space from entry e */
			unsigned char *nu;
			need = key->len + bt->vsz;
			if ( e == b->ent )
				nu = base + b->stk - need;
			else {
				/* at end of next entry, i.e. end of block or beg of previous */
				nu = base + LBT_POS(d) + d->kln + LBT_VLN(d)*bt->vsz;
				memmove( stk - need, stk, nu - stk );
				nu -= need;
				memmove( d+1, d, (b->ent - e)*sizeof(Dict) );
			}
			memcpy( nu, key->byt, key->len );
			memcpy( nu + key->len, key->val.byt, bt->vsz );
			LBT_PV( d, nu-base, 1 );
			d->kln = key->len;
			d++;
			e++;
			b->ent++;
		}
		/*
			need is set to the needed (i.e. additionally used) data space,
			e is the first dict entry to redirect need bytes lower
		*/
		b->stk -= need;
		for ( ; e < b->ent; e++, d++ )
			LBT_POSADD( d, -need );
	}	/* ! insibling */
	while ( s ) { /* created sibling */
		unsigned num = s->num;
		unsigned char *e0 = LBT_BASE(s) + LBT_POS(s->dct);
		unsigned l = s->lev + 1;
		Block *ins;
		Key k;

		memcpy( k.byt, e0, k.len = s->dct[0].kln );
		k.val.len = 0;
		if ( s->lev ? s->dct[0].vln : split )
			memcpy( k.val.byt, e0+k.len, k.val.len = bt->vsz );
		LOG_DBG( LOG_DEBUG, "split: had %d/%d on lev %d key '%.*s' vln %d %s",
			b->num, num, s->lev, k.len, k.byt, k.val.len, PRVAL( pv, k.val ) );

		/* get parent */
		if ( f.b[l]->num == f.n[l] && ! LBT_BUSY(f.b[l]) )
			f.b[l]->lck = LBT_WR;
		else
			f.b[l] = getblk( bt, f.n[l], l, LBT_WR );
		LOG_DBG( LOG_DEBUG,
			"split: got parent %d lev %d", f.b[l]->num, f.b[l]->lev );
		/* MT TODO: again L/Y link hunt */
		/* ok, locked the parent -- release s and b */
		if ( (ret = WRULK( s, bt )) || (ret = WRULK( b, bt )) )
			break;
		s = 0;
		ins = b = f.b[l];
		base = LBT_BASE(b);
		e = KEYPOS( bt, b, &k );
		LOG_DBG( LOG_DEBUG, "split: got parent pos %d", e );
		if ( LBT_MATCH & e ) /* !??? */
			LOG_OTO( done, (ERR_TRASH, "OOPS! found key in parent" ) );
		if ( ! e ) {
			TRACEKEY( ERR_TRASH, &f );
			LOG_OTO( done, (ERR_TRASH, "OOPS! parent insert at 0 lev %d", l ) );
		}
		need = sizeof(Dict) + k.len + 4 + k.val.len;
		if ( need > b->stk - LBT_DOFF - b->ent*sizeof(Dict) ) {
			unsigned half = ((bt->bsz - b->stk)+b->ent*sizeof(Dict)+need) / 2;
			unsigned mv = b->ent, tot = 0, nent, sz;
			int adj;
			if ( e == b->ent ) {
				tot = need;
				ins = 0;
			}
			for ( d = b->dct+mv; d--, mv--; ) {
				sz = sizeof(Dict) + d->kln + 4 + d->vln;
				if ( half > (tot += sz) ) {
					if ( e == mv ) {
						if ( half > tot+need || tot+need-half < half-tot ) {
							tot += need;
							ins = 0; /* new link goes to sibling */
						} else
							break;
						if ( half <= tot )
							break;
					}
					if ( mv )
						continue;
				}
				if ( tot - half > sz/2 ) { /* don't move this */
					tot -= sz;
					d++; mv++;
				} else if ( ! mv ) /* !??? */
					mv = 1;
				break;
			}
			/* split here a little bit simpler */
			s = NEWBLK( bt, l, LBT_WR );
			s->nxt = b->nxt;
			b->nxt = s->num;
			/* straight move entries mv..b->ent-1 to sibling */
			sbase = LBT_BASE(s);
			d = b->dct + mv;
			sz = b->dct[mv-1].pos - b->stk;
			nent = b->ent - mv;
			LOG_DBG( LOG_DEBUG,
				"split: parent lev %d split %d to new %d mv %d"
				" '%.*s'/%s nent %d sz %d",
				l, b->num, s->num, mv, d->kln, base+d->pos,
				prval( pv, base+d->pos+d->kln, d->vln ), nent, sz );
			memcpy( sbase + (s->stk -= sz), base + b->stk, sz );
			adj = s->stk - b->stk;
			b->stk += sz;
			memcpy( s->dct, d, nent*sizeof(Dict) );
			b->ent = mv;
			s->ent = nent;
			for ( d = s->dct; nent--; d++ )
				d->pos += adj;
			if ( ! ins ) {
				ins = s;
				e -= mv;
			}
			if ( (ret = chkblk( s, bt )) )
				break;
		}
		/* now insert link in block ins at pos e */ {
			unsigned isz = k.len + 4 + k.val.len, nent = ins->ent - e;
			unsigned char *ibase = LBT_BASE(ins);
			unsigned end = e ? ins->dct[e-1].pos : bt->bsz;
			LOG_DBG( LOG_DEBUG,
				"split: ins parent %d lev %d pos %d key '%.*s' -> %d vln %d %s",
				ins->num, ins->lev, e, k.len, k.byt, num, k.val.len,
				PRVAL( pv, k.val ) );
			memmove( ibase + ins->stk - isz, ibase + ins->stk, end - ins->stk );
			ins->stk -= isz;
			memcpy( ibase + (end -= 4), &num, 4 );
			if ( k.val.len )
				memcpy( ibase + (end -= k.val.len), k.val.byt, k.val.len );
			memcpy( ibase + (end -= k.len), k.byt, k.len );
			for ( d = ins->dct+ins->ent; nent--; d-- ) {
				*d = d[-1];
				d->pos -= isz;
			}
			d->pos = end;
			d->vln = k.val.len;
			d->kln = k.len;
			ins->ent++;
			if ( (ret = chkblk( ins, bt )) )
				break;
		}
		if ( ! s ) /* write b below or in next turn */
			break;
		if ( ! b->num ) { /* root split */
			Block *bb = NEWBLK( bt, l, LBT_WR );
			LOG_DBG( LOG_DEBUG, "root split!" );
			/* cp all but the num */
			memcpy( LBT_BASE(bb)+4, LBT_BASE(b)+4, bt->bsz-4 );
			b->nxt = s->nxt; /* restore root's special nxt */
			s->nxt = 0;
			b->stk = bt->bsz; /* empty the root */
			/* add two childs bb and s */
			SETINT( base + (b->stk -= 4), bb->num );
			LBT_PVK( b->dct, b->stk, 0, 0 );
			SETINT( base + (b->stk -= 4), s->num );
			e0 = LBT_BASE(s) + LBT_POS(s->dct);
			if ( s->dct[0].vln )
				memcpy( base + (b->stk -= bt->vsz), e0+s->dct[0].kln, bt->vsz );
			memcpy( base + (b->stk -= s->dct[0].kln), e0, s->dct[0].kln );
			LBT_PVK( b->dct+1, b->stk, s->dct[0].vln, s->dct[0].kln );
			b->ent = 2;
			b->lev++; /* increase tree depth */
			if ( (ret = WRULK( s, bt )) || (ret = WRULK( bb, bt )) )
				break;
			s = 0;
			break;
		}
	}
	if ( b && !ret )
		ret = WRULK( b, bt );
done:
	if ( s )
		ULK( s );
	if ( b )
		ULK( b );
	if ( !ret && b->lev && findkey( &f, LBT_RD ) )
		ret = log_msg( ERR_TRASH, "could not find key after add" );
	return ret;
}	/* lbt_add */


int lbt_del ( Idx *bt, Key *key )
{
	int ret = 0;
	unsigned short e, n, i, mv;
	Block *b;
	Dict *d;
	unsigned char *base, *v;
	FindKey f;
	PrVal pv;

	f.bt  = bt;
	f.key = key;
	LOG_DBG( LOG_DEBUG, "deleting key '%.*s'/%s",
		key->len, key->byt, PRVAL( pv, key->val ) );
	if ( findkey( &f, LBT_WR ) )
		LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
	if ( !(LBT_MATCH & f.e[0]) ) /* key not found - ok */
		goto done;
	LOG_DBG( LOG_DEBUG, "key found" );
	e = (LBT_ENTMASK & f.e[0]);
	b = f.b[0];
	d = b->dct + e;
	base = LBT_BASE(b);
	v = base+LBT_POS(d)+d->kln; /* start of values */
	n = LBT_VLN(d);
	if ( ! n )
		goto done;
	i = valpos( v, n, bt->vsz, key->val.byt );
	LOG_DBG( LOG_DEBUG, "val %sfound pos %u of %u",
		(LBT_MATCH & i) ? "" : "not ", i & ~LBT_MATCH, n );
	if ( !(LBT_MATCH & i) ) /* val not found - ok */
		goto done;
	i &= ~LBT_MATCH;
	LOG_DBG( LOG_DEBUG,
		"del blk %d e %d i/n %d/%d", b->num, e, i, n );
	if ( 1 < n ) {
		v += i*bt->vsz; /* end of stack area to move: start of this value */
		mv = bt->vsz; /* amount to move: one value size */
		LBT_VLNADD( d, -1 );
	} else { /* sole value -- rm whole entry */
		v = base+LBT_POS(d); /* start of key */
		mv = d->kln+bt->vsz;
		for ( i=b->ent - e - 1; i--; d++ ) /* move the i entries after d down */
			*d = d[1];
		d = b->dct + e; /* reset */
		b->ent--;
	}
	memmove( base+b->stk+mv, base+b->stk, v-(base+b->stk) );
	b->stk += mv;
	for ( ; e++ < b->ent; d++ )
		LBT_POSADD( d, mv );
	WRULK( b, bt );
done:
	if ( f.b[0] )
		ULK( f.b[0] );
	return ret;
}	/* lbt_del */



int lbt_loop ( Idx *bt, DXLoop *l )
{
	int ret = 0;
	unsigned i = 0;
	int mode = IDXMODE & l->flg;
	Block *b = 0;
	Key *cmp = &l->key;
	if ( ! cmp->len ) /* start at 1st leaf */
		b = getblk( bt, 1, 0, LBT_RD );
	else { /* locate key */
		FindKey f;
		f.bt  = bt;
		f.key = cmp;
		if ( findkey( &f, LBT_RD ) )
			LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
		b = f.b[0];
		i = (unsigned)(LBT_ENTMASK & f.e[0]);
		if ( !(LBT_MATCH & f.e[0]) && IDXEQ == mode )
			goto done;
	}
	if ( IDXUPTO <= mode )
		cmp = &l->to;
	for ( ;; i=0 ) { /* blocks */
		unsigned char *base = LBT_BASE( b );
		Dict *d;
		if ( ! b || b->lev )
			return -ERR_TRASH;
		b->lck = LBT_EX; /* lock */
		d = b->dct + i;
		for ( ; i<b->ent; i++, d++ ) {
			unsigned short pos = LBT_POS( d );
			unsigned short vln = LBT_VLN( d );
			unsigned short kln = d->kln;
			unsigned short j;
			unsigned char *v = base+pos+kln;
			Key key;
			Hit hit;
			int diff = memcmp( cmp->byt, base+pos,
				kln <= cmp->len ? kln : cmp->len );
			switch ( mode ) {
			case IDXPF:
				if ( diff || kln < cmp->len )
					goto moan;
				break;
			case IDXEQ:
				if ( diff || kln != cmp->len )
					goto done;
				break;
			case IDXUPTO:
				if ( 0 >= diff )
					goto done;
				break;
			case IDXINCL:
				if ( 0 > diff || (!diff && kln > cmp->len) )
					goto done;
				break;
			default: /* unused */
				moan:
				log_msg( LOG_INFO,
					"loop ends on key '%.*s'(%d) %d %d",
					kln, base+pos, kln, diff, cmp->len );
				goto done;
			}
			memcpy( key.byt, base+pos, key.len = kln );
			if ( l->cb )
				for ( j=0; j<vln; j++, v += bt->vsz ) {
					rdval( &hit, v, bt->vsz );
					ret = l->cb( l->me, &key, &hit );
					if ( ret )
						goto done;
				}
			else { /* internal check */
				FindKey f;
				f.bt  = bt;
				f.key = &key;
				memcpy( key.val.byt, v, key.val.len = bt->vsz ); 
				if ( findkey( &f, LBT_RD ) )
					LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
				if ( !(LBT_MATCH & f.e[0])
					|| b != f.b[0]
					|| i != (unsigned)(LBT_ENTMASK & f.e[0])
				) {
					log_msg( ERR_TRASH,
						"OOPS! want %u(%p).%u got %u(%p).%u (match %x)",
						b->num, b, i, f.b[0]->num, f.b[0], LBT_ENTMASK & f.e[0], f.e[0] );
					rdval( &hit, v, bt->vsz );
					log_msg( ERR_TRASH,
						"\twhen looking for '%.*s'/%u.%u.%u.%u",
						key.len, key.byt, hit.mfn, hit.tag, hit.occ, hit.pos );
					if ( f.b[0]->ent <= (LBT_ENTMASK & f.e[0]) )
						log_msg( ERR_TRASH, "\tI found nothing" );
					else {
						Dict *dd = f.b[0]->dct + (LBT_ENTMASK & f.e[0]);
						unsigned char *bb = LBT_BASE(f.b[0])+LBT_POS(dd);
						rdval( &hit, bb+dd->kln, bt->vsz );
						log_msg( ERR_TRASH,
							"\tI found '%.*s'/%u.%u.%u.%u",
							dd->kln, bb, hit.mfn, hit.tag, hit.occ, hit.pos );
					}
				} else
					log_msg( LOG_TRACE, "key ok" );
			}
		}
		ULK( b );
		if ( ! b->nxt || !(b = getblk(bt, b->nxt, 0, LBT_RD)) )
			break;
	}
done:
	if ( b )
		ULK( b );
	if ( l->cb )
		l->cb( l->me, 0, 0 );
	else
		log_msg( LOG_INFO,
			"checked %i keys in %u blks of %dK depth %u cmp/get/miss %u/%u/%u",
			stat.key, bt->len, bt->bsz>>10, bt->root->lev, stat.cmp, stat.get, stat.miss );
	return ret;
}	/* lbt_loop */


/* mimik the plain old ldb_search */
int lbt_search ( Idx *bt, Key *key, LdbPost *post, Rec *rec )
{
	int ret = 0;
	unsigned i = 0;
	int cont = rec && rec->len; /* continuing previous terms search */
	char *stk = rec ? ((char*)rec + rec->bytes) : 0; /* term stack */
	int prefix;
	Block *b = 0;
	Key start;
	LdbP * const p = post->p; /* really just an alias */

	/* check for prefix match */
	if ( post ) {
		if ( 8 != bt->vsz )
			return log_msg( ERR_INVAL, "bad legacy search on vsz %d", bt->vsz );
		prefix = LDB_PFX & post->mode;
	} else {
		if ( ! rec )
			return log_msg( ERR_INVAL, "lbt_search needs rec or post" );
		if ( (prefix = key->len && '$' == key->byt[key->len - 1]) )
			key->len--;
	}

	if ( ! cont )
		start = *key;
	else {
		memset( &start, 0, sizeof(start) );
		start.len = 255 < rec->field[rec->len-1].len
			? 255 : rec->field[rec->len-1].len;
		memcpy( start.byt, rec->field[rec->len-1].val, start.len );
		rec->len = 0;
	}
		
	if ( ! start.len ) /* start at 1st leaf */
		b = getblk( bt, 1, 0, LBT_RD );
	else { /* locate key */
		FindKey f;
		f.bt  = bt;
		f.key = &start;
		if ( findkey( &f, LBT_RD ) )
			LOG_OTO( ouch, ( ERR_TRASH, "could not find key" ) );
		b = f.b[0];
		i = (unsigned)(LBT_ENTMASK & f.e[0]);
		if ( !(LBT_MATCH & f.e[0]) && ! prefix )
			goto done;
	}

	for ( ;; i=0 ) { /* blocks */
		unsigned char *base = LBT_BASE( b );
		Dict *d;
		if ( ! b || b->lev )
			return -ERR_TRASH;
		b->lck = LBT_EX; /* lock */
		d = b->dct + i;
		for ( ; i<b->ent; i++, d++ ) { /* entries */
			unsigned char *ent = base + LBT_POS( d ); /* start of key */
			unsigned short kln = d->kln;
			if ( memcmp( key->byt, ent, kln <= key->len ? kln : key->len )
				|| (prefix ? kln < key->len : kln != key->len )
			)
				goto done;
			if ( rec ) { /* record term */
				Field *f = rec->field + rec->len; /* field to assign */
				/** append term to record, unless it's the same as previous */
				if ( rec->len ) {
					if ( kln == f[-1].len && !memcmp( f[-1].val, ent, kln ) )
						continue;
				} else if ( cont ) {
					if ( kln == start.len && !memcmp( start.byt, ent, kln ) )
						continue;
				}
				stk -= kln;
				if ( stk < (char*)(f+1) ) /* stack reached field dict */
					goto done;
				memcpy( stk, ent, kln );
				f->tag = 0;
				f->val = stk;
				f->len = kln;
				rec->len++;
				continue;
			}
			{ /* more locals */
			LdbP merge[1024];
			unsigned short vln, k, m=0;
			unsigned char *c;
			int f = post->fil-1; /* highest pos to consider in given postings */
			if ( !(vln = LBT_VLN( d )) )
				continue;
			ent += kln; /* set to start of values */
			c = ent + 8*(vln-1); /* last value */
			/* collect postings */
			for ( k=vln; k--; c-=8 ) {
				/* loop backwards (for the fun of it) postings in segment */
				int prow, ptag, ppos;
				LdbP e; /* the entry */
				LdbP samerow; /* highest possible entry w/ same row as e */
#if defined( LDB_BIG_ENDIAN )
				/* the 8 bytes of a posting are BIG ENDIAN ! */
				memcpy(e.bytes,c,8);
#else
				e.bytes[0] = c[7];	e.bytes[1] = c[6];
				e.bytes[2] = c[5];	e.bytes[3] = c[4];
				e.bytes[4] = c[3];	e.bytes[5] = c[2];
				e.bytes[6] = c[1];	e.bytes[7] = c[0];
#endif
				prow = LDBP_ROW( &e );
				ptag = LDBP_TAG( &e );
				ppos = LDBP_POS( &e );
				LOG_DBG( LOG_VERBOSE, "post %d.%hd pos %06x", prow, ptag, ppos );
				if ( !prow )
					continue;
				if ( (post->cut && prow >= post->cut)
					|| (post->tag && post->tag != ptag)
				)
					continue;
				if ( prow < post->skp ) /* quickly bail out on skip condition */
					break;
				LDBP_SETROWTOP( &samerow, &e ); /* for mfn comparison */
				/* sweep down to postings for the same row as e ... */
				while ( f >= 0 && LDBP_GT( p+f, &samerow ) )
					f--;
				if ( LDB_AND & post->mode ) {
					int l;
					/* loop postings for same row, mark all (that are near enough) */
					LDBP_SETROWBOT( &samerow, &e ); /* for mfn comparison */
					/* NOTE: postings for row are GT than bottom even if marked */
					for ( l = f; l>=0 && LDBP_GT( p+l, &samerow ); l-- ) {
						if ( post->near ) {
							int dist;
							if ( ptag != LDBP_TAG( p+l ) ) continue;
							if ( LDB_NEAR_G != post->near ) {
								dist = LDBP_POS( p+l ) - LDBP_POS( &e );
								if ( dist < 0 ) dist = -dist;
								if ( 0 < post->near
									? post->near < dist
									: -post->near != dist /* exact $$$$ */
								) continue;
							}
						}
						LDBP_SETMARK( p+l );
					}
				} else {	/* OR mode */
					int add;
					if ( ! post->near ) /* add if row not found: ignore details */
						add = 0 > f || prow > LDBP_ROW( p+f );
					else {	/* add if no exact match */
						int l;
						/* NOTE: we don't use mark bit in OR mode, do we ? */
						for ( l = f; l>=0 && LDBP_GT( p+l, &e ); l-- )
							;
						add = 0 > l || LDBP_GT( &e, p+l );
					}
					if ( add )
						merge[ m++ ] = e;
				}
			}	/* for postings in segment */
			if ( m ) { /* merge in the merge buffer */
				LdbP *mm = merge;
				for ( k = post->fil += m; m && k--; ) {
					LdbP src;
					if ( k < m || LDBP_GT( mm, &p[k-m] ) ) {
						src = *mm++;
						m--;
						LOG_DBG( LOG_DEBUG, "merging %d at %d", LDBP_ROW(&src), k );
					} else
						src = p[k-m];
					if ( k < post->len )
						p[k] = src;
					else { /* set cut */
						int row = LDBP_ROW( &src );
						if ( row < post->cut || !post->cut )
							post->cut = row;
					}
				}
				if ( post->fil > post->len )
					post->fil = post->len;
				if ( post->cut ) /* postings for cut row are unreliable */
					while ( post->fil && post->cut <= LDBP_ROW(p+post->fil-1) )
						post->fil--;
			} /* if ( m ) */
			} /* more locals */
		}	/* for entries in block */
		ULK( b );
		if ( ! b->nxt || !(b = getblk(bt, b->nxt, 0, LBT_RD)) )
			break;
	}
done:
	if ( rec )
		ret = rec->len;
ouch:
	if ( b )
		ULK( b );
	return ret;
}	/* lbt_search */


int lbt_init ( Idx *bt )
{
	int ret = 0, blkbits, i;
	Block *l;
	int size = lio_size( bt->fd );

	bt->root = 0;
	bt->hash = 0;
	bt->mem = 0;
	for ( i=LBT_LRULEV; i--; )
		bt->lru[i] = bt->mru[i] = 0;

	if ( 0x3ff & size ) /* not on 1K boundary */
		return log_msg( LOG_ERROR, "bt has bad size %dK + %d",
			size >> 10, (int)size & 0x3ff );
	if ( size ) { /* read root header */
		Block b;
		unsigned want = sizeof(b) - LBT_BOFF;
		int got = lio_pread( &bt->fd, LBT_BASE(&b), want, 0 );
		if ( want != (unsigned)got )
			return log_msg( LOG_SYSERR, "could not get root head" );
		if ( b.num )
			return log_msg( ERR_TRASH, "root has num %d", b.num );
		/*if ( LBT_ISIS != b.nxt ) log_msg( LOG_WARN, "magic is 0x%08x", b.nxt );*/
		if ( bt->typ && bt->typ != b.typ )
			log_msg( LOG_WARN, "typ want 0x02%x got 0x02%x", bt->typ, b.typ );
		bt->typ = b.typ;
		if ( bt->key && bt->key != b.key )
			log_msg( LOG_WARN, "key want %d got %d", bt->key, b.key );
		bt->key = b.key;
		if ( bt->col && bt->col != b.col )
			log_msg( LOG_WARN, "col want %d got %d", bt->col, b.col );
		bt->col = b.col;
		bt->dpt = b.lev;
		/*
		b.ent
		b.stk
		*/
	} else { /* new index */
		bt->dpt = 1;
		if ( ! bt->key )
			bt->key = 30;
	}

	bt->vsz = 8+(0xf & bt->typ);
	blkbits = 10 + (3 & (bt->typ >> 4));
	bt->bsz = 1 << blkbits;
	if ( size % bt->bsz )
		return log_msg( ERR_TRASH, "size 0x%08x % blksize", size, bt->bsz );
	bt->len = size >> blkbits;
	if ( 2 > bt->len && !(LBT_WRITE & bt->flg) )
		return log_msg( LOG_SYSERR, "attempt to read empty index" );

	bt->hlen = 16 + bt->len / 32;	/* initial cache size */
	if ( bt->hlen < 1000 ) bt->hlen = 1000;
	if ( !(bt->root = addchunk( bt, bt->hlen )) )
		return log_msg( LOG_SYSERR, "\twhen getting bt cache" );
	if ( bt->hlen < 256 ) /* the hash */
		bt->hlen = 256;
	if ( !(bt->hash = (Block**)mAlloc( bt->hlen * sizeof(Block*) )) )
		return log_msg( LOG_SYSERR, "\twhen getting bt hash" );
	memset( bt->hash, 0, bt->hlen * sizeof(Block*) );

	if ( size && readblk( bt->root, bt, 0 ) )
		return log_msg( ERR_TRASH, "could not read root" );
	if ( ! bt->len ) {
		/* init: link root to leaf */
		SETINT( LBT_BASE(bt->root)+(bt->root->stk -= 4), 1 );
		LBT_PVK( bt->root->dct, bt->root->stk, 0, 0 );
		bt->root->ent = 1;
		bt->len = 1;
	}
	if ( !(l = getblk( bt, 1, 0, LBT_EX )) )
		return log_msg( ERR_TRASH, "could not read 1st leaf" );
	if ( ! size ) {
		log_msg( LOG_INFO, "creating index bsz %u ksz %u vsz %u",
			bt->bsz, bt->key, bt->vsz );
		bt->root->lev = 1;
		if ( WRULK( bt->root, bt ) || WRULK( l, bt ) )
			return log_msg( LOG_SYSERR, "\twhen writing base" );
	}

	return ret;
}	/* lbt_init */


void lbt_close ( Idx *bt )
{
	Chunk *c, *n = bt->mem;
	while ( (c = n) ) {
		n = c->nxt;
		mFree( c );
	}
	mFree( bt->hash );
	lio_close( &bt->fd, LIO_INOUT );
	memset( bt, 0, sizeof(*bt) );
}	/* lbt_close */


/* ************************************************************
	public functions
*/


void cXMkVal ( Idx *bt, Val *val, Hit *hit )
{
	if ( hit ) {
		val->len = bt->vsz;
		mkval( val, hit );
	} else
		val->len = 0;
}

int cXAdd ( Idx *bt, Key *key, Hit *hit )
{
	if ( bt->bat ) {
		if ( ! key )
			key = &bt->bat->key;
		cXMkVal( bt, &key->val, hit );
		return lbt_batchval( bt, key );
	} else if ( !key )
		return 0;
	else if ( hit ) {
		cXMkVal( bt, &key->val, hit );
		if ( hit->dbn == 0xffff )
			return lbt_del( bt, key );
	}
	return lbt_add( bt, key );
}	/* cXAdd */