/[webpac]/trunk2/openisis/lbt.c
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /trunk2/openisis/lbt.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 337 - (hide annotations)
Thu Jun 10 19:22:40 2004 UTC (19 years, 9 months ago) by dpavlin
File MIME type: text/plain
File size: 64211 byte(s)
new trunk for webpac v2

1 dpavlin 237 /*
2     openisis - an open implementation of the CDS/ISIS database
3     Version 0.8.x (patchlevel see file Version)
4     Copyright (C) 2001-2003 by Erik Grziwotz, erik@openisis.org
5    
6     This library is free software; you can redistribute it and/or
7     modify it under the terms of the GNU Lesser General Public
8     License as published by the Free Software Foundation; either
9     version 2.1 of the License, or (at your option) any later version.
10    
11     This library is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14     Lesser General Public License for more details.
15    
16     You should have received a copy of the GNU Lesser General Public
17     License along with this library; if not, write to the Free Software
18     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19    
20     see README for more information
21     EOH */
22    
23     /*
24     $Id: lbt.c,v 1.35 2003/05/27 11:03:30 kripke Exp $
25     the btree.
26     */
27    
28     #include <stdlib.h>
29     #include <string.h>
30    
31     /* special
32     #if defined( __GNUC__ ) && defined ( alloca )
33     #include <alloca.h>
34     #endif
35     */
36     #if defined( sparc ) || defined( __ppc__ )
37     # define LDB_BIG_ENDIAN
38     #endif
39    
40     #include "lio.h"
41     #include "luti.h"
42     #include "lbt.h"
43    
44    
45     /*
46     probably all relevant cases can be handled by the more
47     efficient approach of storing recoded key a la strxfrm
48     */
49     #define LBT_USECMP 0
50    
51    
52     /* ************************************************************
53     private types
54     */
55    
56     enum {
57     /* little endian magic text */
58     LBT_OPEN = 'O' | 'P'<<8 | 'E'<<16 | 'N'<<24,
59     LBT_ISIS = 'I' | 'S'<<8 | 'I'<<16 | 'S'<<24
60     };
61    
62    
63     enum {
64     LBT_LEVELS = 16, /* max depth */
65     LBT_POSBITS = 13, /* bits for position within block */
66     LBT_POSMASK = 0x1fff,
67     LBT_POSMAX = LBT_POSMASK /* obviously :) */
68     };
69    
70     typedef struct Dict { /* dictionary of block entries */
71     unsigned short pos; /* 13 bits position of entry, 2 bits val for leaves */
72     unsigned char vln; /* length of entry's value */
73     /* for inner nodes, this is the length of qualifying
74     second key segment (either 0 or bt->vsz <= 23).
75     for leaves, this, together with higher bits of pos,
76     is the number of values of bt->vsz each.
77     */
78     unsigned char kln; /* length of entry's key */
79     /*
80     the following access macros are needed for leaves only
81     */
82     #define LBT_POS( d ) (LBT_POSMASK & (d)->pos)
83     #define LBT_VLN( d ) ((d)->vln | (~LBT_POSMASK & (d)->pos)>>(LBT_POSBITS-8))
84     #define LBT_POSADD( d, a ) do { \
85     (d)->pos = (~LBT_POSMASK & (d)->pos) | (LBT_POSMASK & ((d)->pos + (a))); \
86     } while (0)
87     #define LBT_VLNADD( d, a ) do { \
88     unsigned _vln = LBT_VLN( d ) + (a); \
89     (d)->pos = (LBT_POSMASK & (d)->pos) \
90     | (~LBT_POSMASK & ((_vln)<<(LBT_POSBITS-8))); \
91     (d)->vln = (unsigned char)(_vln); \
92     } while (0)
93     #define LBT_PV( d, p, v ) do { \
94     (d)->pos = (LBT_POSMASK & (p)) | (~LBT_POSMASK & ((v)<<(LBT_POSBITS-8))); \
95     (d)->vln = (unsigned char)(v); \
96     } while (0)
97     #define LBT_PVK( d, p, v, k ) do { \
98     LBT_PV( d, p, v ); \
99     (d)->kln = (unsigned char)(k); \
100     } while (0)
101     } Dict;
102    
103    
104     /* guard space around block */
105     #ifndef NDEBUG
106     # define LBT_GUARD 64
107     /* template for guard space -- blank or pattern */
108     static unsigned char guard[LBT_GUARD];
109     #else
110     # define LBT_GUARD 0
111     #endif
112    
113    
114     /*
115     TODO: check the possible performance gain in findkey
116     from separating disk block and administrative data
117     and aligning disk blocks on PAGE boundaries in memory
118     */
119     typedef struct Block {
120     #if LBT_GUARD
121     unsigned char grd[LBT_GUARD];
122     #endif
123     unsigned char ird; /* is reading */
124     unsigned char wrd; /* #threads waiting for read */
125     unsigned char lck; /* is write locked */
126     unsigned char wlk; /* #threads waiting for write lock */
127     #define LBT_BUSY( b ) (*(int*)&(b)->ird) /* any of 1st 4 bytes != 0 */
128     struct Block *nhs; /* next in hash chain */
129     struct Block *nru; /* next (more) recently used */
130     /* TODO: possibly double-link lru list, so we can unlink on every access ? */
131     /* the disk block -- from here on bsz bytes are on disk */
132     #define LBT_BOFF ((char *)&((Block*)0)->num - (char*)0)
133     #define LBT_BASE( b ) ((unsigned char *)&(b)->num)
134     unsigned num; /* number of this block, 0 is root */
135     unsigned nxt; /* right sibling, "OPEN" or "ISIS" for root */
136     /* typ,key,col are redundant in non-root blocks, maybe abused ... */
137     unsigned char typ; /* type: bsz, vsz, flags */
138     unsigned char key; /* max key length */
139     unsigned char col; /* collation */
140     unsigned char lev; /* level over bottom, 0 for leaves */
141     unsigned short ent; /* 10 bits # entries */
142     #define LBT_ENT( b ) (b)->ent /* currently other bits arent abused */
143     unsigned short stk; /* 13 bits offset of keyspace stack */
144     /* offset of dict RELATIVE to base */
145     #define LBT_DOFF ((char *)&((Block*)0)->dct - (char*)&((Block*)0)->num)
146     Dict dct[1];
147     } Block;
148    
149     /*
150     LAYOUT:
151     dct is an array of dictionary entries in increasing key order
152     stk is growing downwards from block end towards dict
153     position of entry i is b->stk <= LBT_POS(b->dct+i) < bt->bsz
154     decreasing on i (i.e. stack space is in reverse ordering)
155    
156     at LBT_POS(b->dct+i), there are b->dct[i].kln bytes key
157     for a leaf entry, the key is followed by
158     LBT_VLN(b->dct+i) (between 0..1023 incl.) values
159     of bt->vsz (between 8..23 incl) each
160     for an inner entry, the key is followed by
161     4 bytes block number and optionally bt->vsz qualifying value
162    
163     SIZES:
164     block size >= 1024
165     block base overhead LBT_DOFF is 16 bytes
166     avail size >= 1008
167     Dict entry is 4 bytes
168     max. used stack space per entry is:
169     inner node: kln+4+bt->vsz <= 255+4+23 = 282
170     new leaf entry (one value): kln+bt->vsz <= 278
171     max total space for a new entry incl. dict is 286 < avail size/3
172     */
173    
174     typedef struct Chunk {
175     struct Chunk *nxt;
176     } Chunk;
177    
178    
179     enum {
180     /* number of levels for lru list */
181     LBT_LRULEV = sizeof(((Idx*)0)->lru)/sizeof(((Idx*)0)->lru[0]),
182     LBT_ENTBITS = 10, /* bits for # enries */
183     LBT_ENTMASK = 0x03ff,
184     LBT_ENTMAX = LBT_ENTMASK,
185     LBT_MATCH = 0x8000, /* entry matched */
186     LBT_ENTSIZE = 8 /* minimum cost of entry: 4 byte dict, 4 byte val */
187     };
188    
189    
190     typedef struct Batch {
191     Block *cur;
192     unsigned got; /* current for this key */
193     unsigned tot; /* total for this key */
194     unsigned let;
195     /* stats */
196     unsigned keys;
197     unsigned keyl;
198     unsigned maxl;
199     unsigned vals;
200     unsigned maxv;
201     unsigned span;
202     unsigned free;
203     /* buffer */
204     Key key;
205     Val val[LBT_ENTMAX];
206     } Batch;
207    
208    
209     typedef enum {
210     LBT_RD, /* read -- no lock */
211     LBT_WR, /* write */
212     LBT_EX /* exclusive */
213     } lbt_get;
214    
215    
216     typedef struct { /* structure filled by findkey */
217     Idx *bt; /* the tree */
218     Key *key; /* the key searched */
219     Block *b[LBT_LEVELS]; /* path to leaf, b[0] is leaf */
220     unsigned n[LBT_LEVELS]; /* block numbers of path */
221     unsigned short e[LBT_LEVELS]; /* entries in path */
222     } FindKey;
223    
224    
225     typedef struct {
226     unsigned get;
227     unsigned miss;
228     unsigned cmp;
229     unsigned key;
230     } Stat;
231    
232     /* buffer to print a value to.
233     lowest 11 bytes are printed as 4 shorts + one int < 4*5+10 = 30 digits+5sep
234     higher 12 bytes are printed hex = 24 bytes.
235     */
236     typedef char PrVal[64];
237    
238    
239     /* ************************************************************
240     private data
241     */
242    
243     /*
244     stat access is not guarded by memory barrier.
245     may be wrong when running on multiple processors
246     */
247     static Stat stat;
248    
249    
250     /* ************************************************************
251     private functions
252     */
253    
254     #if defined( sparc )
255     static unsigned GETINT ( void *m ) /* bus error safe version of *(int*) */
256     {
257     unsigned l;
258     memcpy( &l, m, 4 );
259     return l;
260     }
261     static void SETINT ( void *m, unsigned i )
262     {
263     memcpy( m, &i, 4 );
264     }
265     #else
266     #define GETINT( m ) (*(unsigned*)(m))
267     #define SETINT( m, i ) (*(unsigned*)(m) = (i))
268     #endif
269    
270     static void mkval ( Val *v, Hit *ifp )
271     {
272     unsigned char *c = v->byt;
273     switch ( v->len ) {
274     default: memset( c, 0, v->len-11 ); c += v->len-11;
275     case 11: *c++ = (unsigned char) (ifp->dbn >> 8);
276     case 10: *c++ = (unsigned char) ifp->dbn;
277     case 9: *c++ = (unsigned char) (ifp->mfn >> 24);
278     case 8: *c++ = (unsigned char) (ifp->mfn >> 16);
279     case 7: *c++ = (unsigned char) (ifp->mfn >> 8);
280     case 6: *c++ = (unsigned char) ifp->mfn;
281     case 5: *c++ = (unsigned char) (ifp->tag >> 8);
282     case 4: *c++ = (unsigned char) ifp->tag;
283     case 3: *c++ = (unsigned char) ifp->occ;
284     case 2: *c++ = (unsigned char) (ifp->pos >> 8);
285     case 1: *c++ = (unsigned char) ifp->pos;
286     case 0: ;
287     }
288     } /* mkval */
289    
290    
291     static Hit *rdval ( Hit *ifp, unsigned char *c, unsigned vsz )
292     {
293     ifp->mfn = ifp->tag = ifp->occ = ifp->pos = ifp->dbn = 0;
294     switch ( vsz ) {
295     default: c += vsz-11;
296     case 11: ifp->dbn = (unsigned short)*c++ << 8;
297     case 10: ifp->dbn |= *c++;
298     case 9: ifp->mfn = (unsigned int)*c++ << 24;
299     case 8: ifp->mfn |= (unsigned int)*c++ << 16;
300     case 7: ifp->mfn |= (unsigned int)*c++ << 8;
301     case 6: ifp->mfn |= *c++;
302     case 5: ifp->tag = (unsigned short)*c++ << 8;
303     case 4: ifp->tag |= *c++;
304     case 3: ifp->occ = *c++;
305     case 2: ifp->pos = (unsigned short)*c++ << 8;
306     case 1: ifp->pos |= *c++;
307     case 0: ;
308     }
309     return ifp;
310     } /* rdval */
311    
312    
313     static char *prval ( PrVal buf, unsigned char *c, unsigned vsz )
314     {
315     char *d = buf;
316     Hit h;
317     if ( ! vsz ) {
318     buf[0] = '#';
319     buf[1] = 0;
320     return buf;
321     }
322     if ( 23 < vsz )
323     vsz = 23;
324     if ( 11 < vsz ) {
325     while ( 11 < vsz-- ) {
326     *d++ = luti_hex[ *c >> 4 ];
327     *d++ = luti_hex[ *c++ & 0xf ];
328     }
329     *d++ = '.';
330     }
331     rdval( &h, c, vsz );
332     d += u2a( d, h.dbn ); *d++ = '.';
333     d += u2a( d, h.mfn ); *d++ = '.';
334     d += u2a( d, h.tag ); *d++ = '.';
335     d += u2a( d, h.occ ); *d++ = '.';
336     d += u2a( d, h.pos ); *d++ = '.';
337     *d = 0;
338     return buf;
339     } /* prval */
340    
341     /* for PRVAL f->key->val */
342     #define PRVAL( buf, v ) prval( buf, v.byt, v.len )
343    
344    
345     /* ************************************************************
346     block I/O & cache mgmt
347     */
348    
349     static void iniblk ( Block *b, Idx *bt, unsigned num, unsigned char lev )
350     {
351     memset( b, 0, LBT_BOFF + bt->bsz + LBT_GUARD );
352     b->num = num;
353     b->typ = bt->typ;
354     b->key = bt->key;
355     b->col = bt->col;
356     b->lev = lev;
357     b->stk = bt->bsz;
358     } /* iniblk */
359    
360    
361     static int chkblk ( Block *b, Idx *bt )
362     {
363     #if 0
364     (void)b; (void)bt;
365     #else
366     int i;
367     unsigned short eod = LBT_DOFF + b->ent*sizeof(Dict), siz, pos, vln, key;
368     unsigned end = bt->bsz, tot = 0;
369     unsigned char *base = LBT_BASE(b);
370     #if LBT_GUARD
371     if ( memcmp( guard, b->grd, LBT_GUARD ) )
372     return log_msg( ERR_TRASH, "OOPS! initial guard frobbed on blk %d", b->num );
373     if ( memcmp( guard, LBT_BASE(b)+bt->bsz, LBT_GUARD ) )
374     return log_msg( ERR_TRASH, "OOPS! initial guard frobbed on blk %d", b->num );
375     #endif
376     if ( b->num > bt->len || (b->num && b->nxt > bt->len) )
377     return log_msg( ERR_TRASH,
378     "bad num %d / nxt %d on lev %d", b->num, b->nxt, b->lev );
379     if ( b->typ != bt->typ || b->key != bt->key || b->col != bt->col )
380     return log_msg( ERR_TRASH,
381     "bad sig 0x%02x,%d,%d in block %d want 0x%02x,%d,%d",
382     b->typ, b->key, b->col, b->num, bt->typ, bt->key, bt->col );
383     if ( b->ent > bt->bsz / LBT_ENTSIZE || b->stk > bt->bsz || b->stk < eod )
384     return log_msg( ERR_TRASH,
385     "bad ent/stk %d,%d in block %d eod %d sz %d",
386     b->ent, b->stk, b->num, eod, bt->bsz );
387     for ( i=0; i<b->ent; i++ ) {
388     Dict *d = &b->dct[i];
389     pos = LBT_POS( d );
390     vln = LBT_VLN( d );
391     key = d->kln;
392     siz = key + (b->lev ? vln+4u : vln * bt->vsz);
393     if ( pos < b->stk || end != (unsigned)pos + siz )
394     return log_msg( ERR_TRASH,
395     "bad pos/vln/key %d/%d/%d in entry %d of block %d"
396     " stack %d sz %d end %d expected %d",
397     pos, vln, key, i, b->num, b->stk, siz, pos+siz, end );
398     if ( b->lev ) {
399     unsigned lnk;
400     if ( vln && vln != +bt->vsz )
401     return log_msg( ERR_TRASH,
402     "bad vln %d in entry %d of block %d vsz %d",
403     vln, i, b->num, bt->vsz );
404     lnk = GETINT( base + pos + key + vln );
405     if ( lnk >= bt->len )
406     return log_msg( ERR_TRASH,
407     "bad link %u in entry %d of block %d",
408     lnk, i, b->num );
409     }
410     #if 1 && ! defined( NDEBUG )
411     if ( i ) {
412     unsigned short op = LBT_POS( d-1 ), okl = d[-1].kln;
413     int mc = memcmp( base+pos, base+op, key<okl ? key : okl ), j;
414     if ( 0 < mc || (!mc && 0 < (mc = (int)key - (int)okl)) )
415     goto ok;
416     j = 0;
417     if ( !mc && b->lev && vln ) { /* compare values */
418     unsigned ovln = d[-1].vln;
419     if ( ! ovln || 0 < (j = memcmp( base+pos+key, base+op+okl, vln )) )
420     goto ok;
421     }
422     return log_msg( ERR_TRASH,
423     "bad key order entry %d of block %d lev %d"
424     " mc %d j %d key %d okl %d vln %d",
425     i, b->num, b->lev, mc, j, key, okl, vln
426     );
427     }
428     ok:
429     #endif
430     tot += siz;
431     end = pos;
432     }
433     if ( tot > bt->bsz - b->stk )
434     return log_msg( ERR_TRASH,
435     "bad total entry size %d in block %d stack %d",
436     tot, b->num, b->stk );
437     #ifndef NDEBUG
438     if ( LOG_DO( LOG_TRACE ) ) {
439     log_msg( LOG_TRACE,
440     "block %d lev %d sig 0x%02x,%d,%d ent %d eod %d stk %d",
441     b->num, b->lev, b->typ, b->key, b->col, b->ent, eod, b->stk
442     );
443     for ( i=0; i<2; i++ ) {
444     int j = i ? b->ent-1 : 0;
445     Dict *d = &b->dct[j];
446     if ( i >= b->ent ) /* if b->ent is 0 or 1, log no or one key */
447     break;
448     pos = LBT_POS( d );
449     vln = LBT_VLN( d );
450     key = d->kln;
451     end = pos + b->lev ? vln+4u : vln * bt->vsz;
452     /* start of value */
453     if ( b->lev )
454     log_msg( LOG_TRACE, "key %d '%.*s'/%d -> %d",
455     j, key, base+pos, vln, GETINT(base+pos+key+vln) );
456     else {
457     Hit h;
458     rdval( &h, base+pos+key, bt->vsz );
459     log_msg( LOG_TRACE, "key %d '%.*s'[%d] = %d[%d,%d,%d]",
460     j, key, base+pos, vln, h.mfn, h.tag, h.occ, h.pos );
461     }
462     }
463     }
464     #endif
465     #endif
466     return 0;
467     } /* chkblk */
468    
469    
470     static Block *addchunk ( Idx *bt, unsigned cnt )
471     {
472     unsigned ibsz = LBT_BOFF + bt->bsz + LBT_GUARD; /* internal block size */
473     /* increase cnt so it's > 0 and add one extra to return */
474     char *mem = (char *)mAlloc( sizeof(Chunk) + (1 + ++cnt)*ibsz );
475     Chunk *chk = (Chunk *)mem;
476     if ( ! mem )
477     return 0;
478     /* chain the chunk */
479     chk->nxt = bt->mem;
480     bt->mem = chk;
481     mem += sizeof(Chunk);
482     bt->clen += cnt;
483     /* add cnt blocks to level 0 lru list */
484     if ( ! bt->mru[0] ) /* level 0 cache is empty */
485     bt->mru[0] = (Block *)mem;
486     for ( ; cnt--; mem += ibsz ) {
487     Block *b = (Block *)mem;
488     iniblk( b, bt, 0 , 0 ); /* initialise empty block */
489     b->nru = bt->lru[0];
490     bt->lru[0] = b;
491     }
492     iniblk( (Block *)mem, bt, 0 , 0 );
493     return (Block *)mem;
494     } /* addchunk */
495    
496    
497     static int readblk ( Block *b, Idx *bt, unsigned num )
498     {
499     int got = lio_pread( &bt->fd, LBT_BASE(b), bt->bsz, num*bt->bsz );
500     log_msg( LOG_VERBOSE, "reading block %u got %u", num, got );
501     if ( bt->bsz != (unsigned)got )
502     return log_msg( LOG_SYSERR, "\twhen reading bt block %d", num );
503     if ( b->num != num )
504     return log_msg( ERR_TRASH, "bad num %d in block %d", b->num, num );
505     return chkblk( b, bt );
506     } /* readblk */
507    
508    
509     static void unlock ( Block *b )
510     {
511     b->lck = 0;
512     if ( b->wlk )
513     (void)LIO_WAKE( b->num );
514     }
515    
516     #define ULK( b ) do { \
517     if ( (b)->lck ) { (b)->lck = 0; if ( b->wlk ) unlock( b ); } \
518     } while (0)
519    
520    
521     static int writeblk ( Block *b, Idx *bt, int ulk )
522     {
523     int got;
524     if ( chkblk( b, bt ) ) /* don't write no shize */
525     return log_msg( ERR_TRASH, "\tbefore writing bt block %d", b->num );
526     got = lio_pwrite( &bt->fd, LBT_BASE(b), bt->bsz, b->num*bt->bsz );
527     log_msg( LOG_VERBOSE, "writing block %u got %u", b->num, got );
528     if ( ulk )
529     ULK( b );
530     if ( bt->bsz != (unsigned)got )
531     return log_msg( LOG_SYSERR, "\twhen writing bt block %d", b->num );
532     return 0;
533     } /* writeblk */
534    
535     #define WRULK( b, bt ) writeblk( b, bt, 1 )
536    
537    
538     #define NEWBLK( bt, lev, lock ) getblk( bt, bt->len, lev, lock )
539     /**
540     get block #num.
541     1) locate block #num in cache.
542     2) if it is found:
543     if it is READING, or we want to lock and it is locked,
544     incr # waiters, wait for it.
545     3) if it is not found:
546     locate a free cache entry, extending the cache if necessary,
547     set the num, set the READING flag, go read it.
548     after reading, if there are waiters, wake them up.
549     since we are the first to get a hold on the block,
550     we may write lock it, if we want.
551     4) return the block.
552     */
553     static Block *getblk ( Idx *bt, unsigned num,
554     unsigned char lev, lbt_get lock )
555     {
556     Block *b;
557     stat.get++; /* dirty */
558     if ( !(stat.get & 0x3fff) ) { /* every 16K */
559     static unsigned lmiss;
560     log_msg( LOG_INFO,
561     "searched %i keys in %u blks cache %d %d%%"
562     " depth %u cmp/get/miss %u/%u/%u %d%% (%d %d%%)",
563     stat.key, bt->len, bt->clen, bt->clen*100 / bt->len,
564     bt->root->lev, stat.cmp, stat.get,
565     stat.miss, stat.miss*100 / stat.get,
566     (stat.miss - lmiss), (stat.miss - lmiss)*100 / 0x4000
567     );
568     lmiss = stat.miss;
569     }
570     startover: /* only used on absurd congestion */
571     b = 0;
572     LOG_DBG( LOG_DEBUG, "get bt block %u %u %d", num, lev, lock );
573     if ( ! num )
574     b = bt->root;
575     else if ( num < bt->len ) {
576     b = bt->hash[ num % bt->hlen ];
577     while ( b && num != b->num )
578     b = b->nhs;
579     }
580     if ( b )
581     while ( b->ird || (lock && b->lck) ) {
582     int wrd = b->ird;
583     int wlk = lock && b->lck;
584     LOG_DBG( LOG_VERBOSE, "lck on block %u %u r%d l%d",
585     num, lev, b->ird, b->lck );
586     if ( LBT_EX <= lock ) {
587     log_msg( LOG_ERROR, "concurrent mod on EX lock" );
588     return 0;
589     }
590     if ( ! lio_lock ) {
591     log_msg( LOG_FATAL, "bad MT environment: concurrent mod, no lock" );
592     return 0;
593     }
594     if ( wrd && 0xff != b->wrd )
595     b->wrd++;
596     if ( wlk && 0xff != b->wlk )
597     b->wlk++;
598     (void)LIO_WAIT( num ); /* zzzzz .... sleep tight */
599     /* some hours later */
600     if ( num != b->num ) {
601     log_msg( LOG_WARN, "heavy congestion: block %d has gone", num );
602     goto startover;
603     }
604     if ( wrd && b->wrd )
605     b->wrd--;
606     if ( wlk && b->wlk )
607     b->wlk--;
608     }
609     else { /* not found or new block */
610     /* find free block */
611     int i = 0, llev, err = 0;
612     stat.miss++; /* dirty */
613     LOG_DBG( LOG_DEBUG, "bt block %u %u not found", num, lev );
614     for ( ; i < (int)LBT_LRULEV; i++ ) {
615     /* check lru list for level i */
616     Block **bp = &bt->lru[i];
617     while ( (b = *bp) && LBT_BUSY(b) )
618     bp = &b->nru;
619     if ( ! b ) /* try next level */
620     continue;
621     *bp = b->nru; /* dequeue */
622     if ( bt->mru[i] == b ) /* was the tail */
623     bt->mru[i] = 0;
624     break;
625     }
626     if ( ! b ) {
627     log_msg( LOG_WARN, "no free blocks in cache" );
628     if ( !(b = addchunk( bt, 64 )) )
629     return 0;
630     /* TODO: should we also extend the hash ? */
631     } else if ( b->num ) { /* got block -- should be hashed somewhere */
632     Block **bp = &bt->hash[ b->num % bt->hlen ];
633     while ( *bp && *bp != b )
634     bp = &(*bp)->nhs;
635     if ( *bp == b )
636     *bp = b->nhs; /* dehash */
637     /* else ... hmmm should have found it */
638     }
639     assert( ! LBT_BUSY( b ) );
640     iniblk( b, bt, num, lev );
641     /* hash it, so waiters can find it */
642     b->nhs = bt->hash[ b->num % bt->hlen ];
643     bt->hash[ b->num % bt->hlen ] = b;
644     if ( num == bt->len ) { /* extend */
645     b->lev = lev;
646     bt->len++;
647     /* TODO: should be configurable */
648     if ( bt->len > bt->clen << 5 && bt->clen < 0x4000 )
649     addchunk( bt, bt->clen / 4 );
650     } else {
651     b->ird = 1;
652     (void)LIO_RELE();
653     err = readblk( b, bt, num );
654     (void)LIO_LOCK();
655     b->ird = 0;
656     if ( ! err && lev != b->lev )
657     err = log_msg( ERR_TRASH, "bad lev %d in block %d want %d",
658     b->lev, b->num, lev );
659     if ( err ) {
660     Block **bp = &bt->hash[ num % bt->hlen ];
661     while ( *bp && *bp != b )
662     bp = &(*bp)->nhs;
663     if ( *bp == b )
664     *bp = b->nhs; /* dehash */
665     b->lev = 0; /* add to level 0 lru list */
666     b->num = 0; /* tell others that it isn't the expected block */
667     }
668     }
669     /* add to it's levels lru list */
670     llev = LBT_LRULEV > b->lev ? b->lev : LBT_LRULEV-1;
671     if ( bt->mru[ llev ] )
672     bt->mru[ llev ]->nru = b;
673     else
674     bt->lru[ llev ] = b;
675     (bt->mru[ llev ] = b)->nru = 0;
676     if ( b->wrd )
677     (void)LIO_WAKE( num );
678     if ( err ) {
679     b->wrd = b->wlk = 0; /* mark block unused */
680     return 0;
681     }
682     } /* got free block */
683     if ( LBT_WR == lock ) {
684     if ( b->lck ) {
685     log_msg( LOG_FATAL, "bt consi: unexpected lock on %u", num );
686     return 0;
687     }
688     b->lck = 1;
689     }
690     return b;
691     } /* getblk */
692    
693    
694     /** binsearch position for key.
695     position is either the index of the matching entry (0..b->ent-1)
696     or the index (0..b->ent) where key should be inserted.
697     @return position for key (0..b->ent), |ifyed with LBT_MATCH, if found
698     */
699     static unsigned short keypos (
700     #if LBT_USECMP
701     Idx *bt,
702     #define KEYPOS( bt, b, key ) keypos( bt, b, key )
703     #else
704     #define KEYPOS( bt, b, key ) keypos( b, key )
705     #endif
706     Block *b, Key *key )
707     {
708     unsigned char *base = LBT_BASE( b );
709     unsigned char *kbt = key->byt;
710     unsigned char kln = key->len;
711     unsigned short l = 0, r = b->ent;
712     Dict *d = b->dct;
713     /* binsearch invariant:
714     key is < all entries i >= r (we do know relation at r)
715     key is > all entries i < l (don't know relation at l)
716     */
717     while ( l < r ) { /* invariant: l <= keypos < r */
718     unsigned short i = (l + r) >> 1; /* l <= i < r */
719     unsigned short pos = LBT_POS(&d[i]);
720     unsigned char cln = d[i].kln;
721     int cmp =
722     #if LBT_USECMP
723     bt->cmp ? bt->cmp( kbt, base+pos, kln<cln ? kln : cln ) :
724     #endif
725     memcmp( kbt, base+pos, kln<cln ? kln : cln );
726     stat.cmp++; /* dirty */
727     if ( ! cmp && !(cmp = (int)kln - cln) ) {
728     /* match on main key -- what about the value ? */
729     if ( ! b->lev /* ignore on leaves -- no duplicate keys here */
730     || (!d[i].vln && !key->val.len) /* no value */
731     )
732     return LBT_MATCH | i;
733     /* so we're on an inner node */
734     if ( ! d[i].vln ) /* common case */
735     cmp = 1;
736     else if ( ! key->val.len )
737     cmp = -1;
738     else {
739     unsigned char vln = LBT_VLN(&d[i]);
740     int c = key->val.len < d[i].vln ? key->val.len : vln;
741     if ( !(cmp = memcmp( key->val.byt, base+pos+cln, c ))
742     && !(cmp = (int) key->val.len - vln)
743     )
744     return LBT_MATCH | i;
745     }
746     }
747     if ( cmp > 0 )
748     l = i+1;
749     else
750     r = i;
751     }
752     return l; /* == r */
753     } /* keypos */
754    
755    
756     /** find position for (leaf) val.
757     search v in array b of n values of size sz
758     @return position for value (0..n), |ifyed with LBT_MATCH, if found
759     */
760     static unsigned short valpos ( unsigned char *b,
761     unsigned short n, unsigned char sz, unsigned char *v )
762     {
763     unsigned short l = 0, r = n;
764     /* binsearch invariant:
765     v is < all values i >= r (we do know relation at r)
766     v is > all values i < l (don't know relation at l)
767     */
768     while ( l < r ) { /* invariant: l <= valpos < r */
769     unsigned short i = (l + r) >> 1; /* l <= i < r */
770     int cmp = memcmp( v, b+i*sz, sz );
771     if ( ! cmp )
772     return LBT_MATCH | i;
773     if ( cmp > 0 )
774     l = i+1;
775     else
776     r = i;
777     }
778     return l; /* == r */
779     } /* valpos */
780    
781    
782     #ifndef NDEBUG
783     static void TRACEKEY ( unsigned lev, FindKey *f )
784     {
785     int i;
786     unsigned n = 0;
787     Hit h;
788     PrVal pv;
789    
790     if ( LOG_DONT( lev ) )
791     return;
792     log_msg( lev, "key '%.*s' vln %d val %s",
793     f->key->len, f->key->byt, f->key->val.len,
794     PRVAL( pv, f->key->val ) );
795     if ( f->key->val.len ) {
796     rdval( &h, f->key->val.byt, f->key->val.len );
797     log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d",
798     h.dbn, h.mfn, h.tag, h.occ, h.pos );
799     }
800     for ( i=LBT_LEVELS; i--; ) {
801     Block *b = f->b[i];
802     unsigned on = n, e;
803     Dict *d;
804     if ( ! b )
805     continue;
806     d = b->dct + (e = (LBT_ENTMASK & f->e[i]));
807     if ( i )
808     n = GETINT( LBT_BASE(b)+d->pos+d->kln+d->vln );
809     log_msg( lev, "lev %d n %d e 0x%x = %d %c '%.*s' val %d %s -> %d%s",
810     i, f->n[i], f->e[i], e, (LBT_MATCH & f->e[i]) ? 'M' : '-',
811     d->kln, LBT_BASE(b)+d->pos, d->vln,
812     prval( pv, LBT_BASE(b)+d->pos+d->kln, d->vln ),
813     n, on == b->num ? "" : " LY!"
814     );
815     if ( d->vln ) {
816     rdval( &h, LBT_BASE(b)+d->pos+d->kln, d->vln );
817     log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d",
818     h.dbn, h.mfn, h.tag, h.occ, h.pos );
819     }
820     if ( e ) {
821     d--;
822     log_msg( lev, "\tleft '%.*s' val %d %s",
823     d->kln, LBT_BASE(b)+d->pos, d->vln,
824     prval( pv, LBT_BASE(b)+d->pos+d->kln, d->vln )
825     );
826     if ( d->vln ) {
827     rdval( &h, LBT_BASE(b)+d->pos+d->kln, d->vln );
828     log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d",
829     h.dbn, h.mfn, h.tag, h.occ, h.pos );
830     }
831     }
832     }
833     } /* TRACEKEY */
834     #else
835     #define TRACEKEY( l, f ) (void)(f)
836     #endif
837    
838    
839     static int findkey ( FindKey *f, lbt_get lock )
840     {
841     int i;
842     Block *b = f->bt->root;
843     unsigned char *pkey = 0, pkln = 0, pvln = 0;
844     if ( b->lev >= LBT_LEVELS )
845     return log_msg( ERR_TRASH, "tree is too deep: %u", b->lev );
846     stat.key++;
847     for ( i=LBT_LEVELS; i--; ) { /* paranoia */
848     f->b[i] = 0;
849     f->n[i] = f->e[i] = 0;
850     }
851     for ( i=b->lev;; ) {
852     unsigned short e;
853     unsigned n;
854     Dict *d;
855     #ifndef NDEBUG
856     chkblk( b, f->bt );
857     #endif
858     f->n[i] = (f->b[i] = b)->num;
859     f->e[i] = e = KEYPOS( f->bt, b, f->key );
860     LOG_DBG( LOG_DEBUG, "keypos %x", e );
861     #ifndef NDEBUG
862     if ( pkey && b->ent ) {
863     /* compare parent key against our first key
864     pkey must be equal for inner node and not greater for leave
865     */
866     int cmp; /* this - pkey >= 0 */
867     unsigned char *ent, ln;
868     d = b->dct;
869     ent = LBT_BASE(b)+LBT_POS(d);
870     ln = pkln < d->kln ? pkln : d->kln;
871     cmp = ln ? memcmp( ent, pkey, ln ) : 0;
872     if ( ! cmp )
873     cmp = (int)d->kln - (int)pkln;
874     if ( ! cmp ) {
875     if ( ! d->vln || ! pvln )
876     cmp = (int)d->vln - (int)pvln;
877     else
878     cmp = memcmp( ent+d->kln, pkey+pkln, f->bt->vsz );
879     }
880     if ( cmp && (i || 0 > cmp) ) {
881     Hit h;
882     PrVal pv;
883     TRACEKEY( ERR_TRASH, f );
884     if ( d->vln ) {
885     rdval( &h, ent+d->kln, d->vln );
886     log_msg( ERR_TRASH, "\tfirst val db %d mfn %d tag %d occ %d pos %d",
887     h.dbn, h.mfn, h.tag, h.occ, h.pos );
888     }
889     return log_msg( ERR_TRASH, "\tbad first '%.*s' val %d %s",
890     d->kln, ent, d->vln, prval( pv, ent+d->kln, d->vln ) );
891     }
892     }
893     #endif
894     /*
895     here goes the Lehmann/Yao race detection:
896     If we ran on right boundary, follow right link
897     */
898     while ( !(LBT_MATCH & e) && b->ent == e && b->nxt && b->num ) {
899     Block *s;
900     unsigned se;
901     b->lck = LBT_WR; /* hold b while checking s */
902     s = getblk( f->bt, b->nxt, i, i ? LBT_RD : lock );
903     if ( i || ! lock )
904     ULK( b );
905     else
906     b->lck = lock;
907     se = KEYPOS( f->bt, s, f->key );
908     if ( !se ) { /* lower than first */
909     ULK( s );
910     break;
911     }
912     LOG_DBG( LOG_DEBUG, "LYao %d -> %d lev %d for '%.*s' got 0x%x",
913     b->num, s->num, i, f->key->len, f->key->byt, se );
914     /* trade b for s */
915     ULK( b );
916     f->n[i] = (f->b[i] = b = s)->num;
917     e = se;
918     }
919     f->e[i] = e;
920     if ( ! i )
921     return 0;
922     if ( !(LBT_MATCH & e) ) {
923     if ( e )
924     f->e[i] = --e;
925     else {
926     TRACEKEY( ERR_TRASH, f );
927     return log_msg( ERR_IDIOT,
928     "ran on left bound of block %u ent %u",
929     b->num, b->ent );
930     }
931     }
932     d = b->dct + (LBT_ENTMASK & e);
933     /* direct access ok in inner node */
934     pkey = LBT_BASE(b)+d->pos;
935     pkln = d->kln;
936     pvln = d->vln;
937     n = GETINT( pkey+pkln+pvln );
938     i--;
939     b = getblk( f->bt, n, i, i ? LBT_RD : lock );
940     }
941     return -1; /* go away */
942     } /* findkey */
943    
944    
945     static int mktree ( Idx *bt )
946     {
947     Batch *ba = bt->bat;
948     unsigned char lev = 0; /* the level to be indexed */
949     unsigned left = 1; /* block on child level */
950     /* stats */
951     unsigned size = bt->len * bt->bsz;
952     unsigned sval = ba->vals * bt->vsz;
953     unsigned sove = bt->len * LBT_DOFF + sizeof(Dict)*(ba->keys+ba->span);
954     unsigned iblk = 0;
955     unsigned ikey = 0;
956     unsigned ikeyl = 0;
957     unsigned ipfx = 0;
958     unsigned ispn = 0;
959    
960     log_msg( LOG_INFO,
961     "end batch:\n"
962     "\t#blks %u total len %u free %u (%.1f%%) overhead %u (%.1f%%)\n"
963     "\t#keys %u total len %u (%.1f%%) avg %.1f max %u\n"
964     "\t#vals %u total len %u (%.1f%%) avg %.1f max %u spans %u",
965     bt->len, size, ba->free, ba->free*100./size, sove, sove*100./size,
966     ba->keys, ba->keyl, ba->keyl*100./size, (float)ba->keyl/ba->keys, ba->maxl,
967     ba->vals, sval, sval*100./size, (float)ba->vals/ba->keys, ba->maxv, ba->span
968     );
969    
970     /* build tree */
971     for (;;) { /* levels */
972     unsigned len = 0; /* current is n'th block on level */
973     Block *b = 0;
974     unsigned have = 0; /* room in current block b */
975     unsigned char*base = 0; /* of current block b */
976     Dict *d = 0; /* of current block b */
977     unsigned next = left; /* child */
978    
979     left = bt->len;
980     /* get child c and create child pointer in block b */
981     while ( next ) {
982     unsigned char vln = 0;
983     unsigned char *key;
984     unsigned char kln;
985     unsigned need;
986     Block *c = getblk( bt, next, lev, LBT_EX );
987     if ( ! c )
988     return log_msg( ERR_NOMEM, "no child block" );
989     next = c->nxt;
990     if ( ! c->ent ) /* OOPS empty node */
991     continue;
992     key = LBT_BASE(c)+LBT_POS(c->dct);
993     kln = c->dct[0].kln;
994     if ( ! b ) /* the leftmost child on this level */
995     kln = 0; /* always 0 key on left */
996     else if ( lev )
997     vln = LBT_VLN(c->dct);
998     else if ( ba->key.len == kln /* compare key to last of prev. */
999     && !memcmp( ba->key.byt, key, kln ) /* which was saved in ba->key */
1000     ) { /* key spans blocks */
1001     LOG_DBG( LOG_DEBUG, "found span key '%.*s' child %u",
1002     kln, key, c->num );
1003     vln = bt->vsz; /* qualify with value */
1004     } else {
1005     #if 1
1006     unsigned char x = 0;
1007     /* find prefix key */
1008     if ( kln > ba->key.len )
1009     kln = ba->key.len;
1010     while ( x < kln && key[x] == ba->key.byt[x] )
1011     x++;
1012     if ( x < kln ) /* they differ at [x] */
1013     kln = x+1;
1014     else /* equal on the shorter length */
1015     kln++;
1016     if ( kln > c->dct[0].kln )
1017     return log_msg( ERR_TRASH,
1018     "bad key '%.*s' on level %u matching previous of length %u",
1019     c->dct[0].kln, key, lev, ba->key.len );
1020     LOG_DBG( LOG_DEBUG, "prefix on key '%.*s' child %u save %u",
1021     kln, key, c->num, c->dct[0].kln - kln );
1022     ipfx += c->dct[0].kln - kln;
1023     #endif
1024     }
1025     if ( vln )
1026     ispn++;
1027     ikeyl += kln;
1028     need = sizeof(Dict) + kln + vln + 4;
1029     if ( need > have ) {
1030     if ( b ) {
1031     log_msg( LOG_VERBOSE,
1032     "level %u block %u is full %u ent stack %u",
1033     lev, b->num, b->ent, b->stk );
1034     b->nxt = bt->len; /* link */
1035     b->lck = 0;
1036     if ( WRULK( b, bt ) ) goto nowrite;
1037     len++;
1038     }
1039     c->lck = LBT_EX; /* paranoia */
1040     b = NEWBLK( bt, lev+1, LBT_EX );
1041     c->lck = 0;
1042     if ( ! b )
1043     return log_msg( ERR_NOMEM, "no free block" );
1044     if ( b->ent || b->stk != bt->bsz )
1045     return log_msg( ERR_TRASH,
1046     "new block %u is not empty: ent %u stk %u",
1047     b->num, b->ent, b->stk );
1048     base = LBT_BASE( b );
1049     b->lck = LBT_EX; /* lock */
1050     have = bt->bsz - LBT_DOFF; /* - ba->let; TODO: should we ??? */
1051     d = b->dct;
1052     len++;
1053     iblk++;
1054     }
1055     SETINT( base+(b->stk -= 4), c->num );
1056     d->kln = kln;
1057     d->vln = vln;
1058     if ( kln += vln ) /* key + consecutive qualifying value */
1059     memcpy( base+(b->stk -= kln), key, kln );
1060     d->pos = b->stk;
1061     d++;
1062     b->ent++;
1063     ikey++;
1064     have -= need;
1065     /* save the last dance for me */
1066     memcpy( ba->key.byt, LBT_BASE(c)+LBT_POS(&c->dct[c->ent-1]),
1067     ba->key.len = c->dct[c->ent-1].kln );
1068     }
1069     if ( !len )
1070     return log_msg( ERR_TRASH, "level %u was empty", lev );
1071     /* close level */
1072     log_msg( LOG_INFO, "closing level %u with %u blocks %u..%u",
1073     lev, len, left, b->num );
1074     b->lck = 0;
1075     if ( 1 == len ) {
1076     memcpy( bt->root, b, LBT_BOFF + bt->bsz );
1077     b = bt->root;
1078     b->num = 0;
1079     }
1080     if ( WRULK( b, bt ) ) goto nowrite;
1081     lev++;
1082     if ( 2 > len )
1083     break;
1084     }
1085     log_msg( LOG_INFO,
1086     "\tused %u inner blocks %u keys %u bytes avg %.1f\n"
1087     "\t%u #spans (%.1f%%) %u pfx (- %.1f%%)",
1088     iblk, ikey, ikeyl, (float)ikeyl/ikey,
1089     ispn, ispn*100.*bt->vsz/ikeyl, ipfx, ipfx*100./(ipfx+ikeyl) );
1090    
1091     mFree( bt->bat );
1092     bt->bat = 0;
1093     return 0;
1094     nowrite:
1095     return log_msg( LOG_SYSERR, "\twhen writing batch block" );
1096     } /* mktree */
1097    
1098    
1099    
1100     /* ************************************************************
1101     package data
1102     */
1103    
1104    
1105    
1106     /* ************************************************************
1107     package functions
1108     */
1109    
1110     int lbt_batch ( Idx *bt, unsigned char pctfree )
1111     {
1112     bt->bat = (Batch*)mAlloc( sizeof(Batch) );
1113     if ( ! bt->bat )
1114     return log_msg( ERR_NOMEM, "\twhen getting batch" );
1115     memset( bt->bat, 0, sizeof(Batch) );
1116     if ( pctfree > 50 )
1117     pctfree = 50;
1118     bt->bat->let = pctfree * bt->bsz / 100;
1119     lio_trunc( &bt->fd, 0 );
1120     iniblk( bt->root, bt, 0, 0 );
1121     /* TODO: kill hash */
1122     bt->len = 1;
1123     bt->bat->cur = NEWBLK( bt, 0, LBT_EX );
1124     return 0;
1125     } /* lbt_batch */
1126    
1127    
1128     int lbt_batchval ( Idx *bt, Key *key )
1129     {
1130     int cmp = 0;
1131     Batch *ba = bt->bat;
1132     if ( key != &ba->key ) {
1133     int l = key->len < ba->key.len ? key->len : ba->key.len; /* min */
1134     if ( ! (cmp = memcmp(key->byt, ba->key.byt, l)) )
1135     cmp = (int)key->len - (int)ba->key.len;
1136     if ( 0 > cmp )
1137     return log_msg( LOG_ERROR, "batch key '%.*s' < last '%.*s'",
1138     key->len, key->byt, ba->key.len, ba->key.byt );
1139     }
1140     /* flush ? */
1141     while ( ! key->val.len /* end */
1142     || LBT_ENTMAX == ba->got /* full */
1143     || cmp /* next key */
1144     ) {
1145     Block *b = ba->cur;
1146     unsigned have = b->stk - LBT_DOFF - b->ent*sizeof(Dict) - ba->let;
1147     unsigned need = sizeof(Dict) + ba->key.len;
1148     unsigned want = need + ba->got * bt->vsz;
1149     unsigned take = ba->got;
1150     if ( have < want ) { /* you can't always get ... */
1151     if ( have <= need )
1152     take = 0;
1153     else {
1154     take = (have - need) / bt->vsz;
1155     if ( 256 > take * bt->vsz )
1156     take = 0;
1157     }
1158     }
1159     #if 0 /* prefix key turbo ??? -- doesn't seem to work all too well */
1160     if ( have < (bt->bsz >> 1) ) { /* ready to waste half a block ? */
1161     Dict *d = &b->dct[ b->ent-1 ];
1162     unsigned char *last = LBT_BASE(b)+LBT_POS(d);
1163     if ( ba->key.byt[0] != last[0]
1164     || ba->key.byt[1] != last[1]
1165     || ba->key.byt[2] != last[2]
1166     )
1167     take = 0;
1168     }
1169     #endif
1170     LOG_DBG( LOG_DEBUG, "batch '%.*s' got %u need %u want %u have %u take %u",
1171     ba->key.len, ba->key.byt, ba->got, need, want, have, take );
1172     /* we have enough room in b for take entries */
1173     if ( take ) {
1174     unsigned char *base = LBT_BASE( b );
1175     unsigned char *s = base + b->stk;
1176     int i = take;
1177     Dict *d = &b->dct[ b->ent++ ];
1178     while ( i-- )
1179     memcpy( s -= bt->vsz, ba->val[i].byt, bt->vsz );
1180     memcpy( s -= ba->key.len, ba->key.byt, ba->key.len );
1181     b->stk = (unsigned short) (s - base);
1182     LBT_PV( d, b->stk, take );
1183     d->kln = ba->key.len;
1184     if ( ba->got > take )
1185     memcpy( ba->val, ba->val+take, sizeof(ba->val[0])*(ba->got - take) );
1186     ba->got -= take;
1187     }
1188     if ( ! ba->got )
1189     break;
1190     if ( take ) /* key spans blocks */
1191     ba->span++;
1192     ba->free += b->stk - LBT_DOFF - b->ent*sizeof(Dict); /* wasted */
1193     b->nxt = bt->len; /* link */
1194     LOG_DBG( LOG_DEBUG, "batch: %u entries left, getting block %u",
1195     ba->got, bt->len );
1196     if ( WRULK( b, bt ) )
1197     return log_msg( LOG_SYSERR, "\twhen writing batch block" );
1198     /* so couldn't flush all, thus we need another block */
1199     b = ba->cur = NEWBLK( bt, 0, LBT_EX );
1200     /* will continue on end or keychange */
1201     if ( b->ent || b->stk < bt->bsz )
1202     return log_msg( ERR_TRASH, "new block %u is not empty: ent %u stk %u",
1203     b->num, b->ent, b->stk );
1204     LOG_DBG( LOG_VERBOSE, "new block %u has %u ent %u stk, will keep %u",
1205     b->num, b->ent, b->stk, ba->let );
1206     }
1207     if ( cmp || ! key->val.len ) /* done with key: stats */
1208     if ( ba->maxv < ba->tot )
1209     ba->maxv = ba->tot;
1210     if ( key->val.len ) {
1211     if ( cmp ) { /* set new key */
1212     if ( ba->got )
1213     return log_msg( ERR_IDIOT, "batch not empty on new key" );
1214     ba->tot = 0;
1215     memcpy( ba->key.byt, key->byt, ba->key.len = key->len );
1216     ba->keys++;
1217     ba->keyl += key->len;
1218     if ( ba->maxl < key->len )
1219     ba->maxl = key->len;
1220     }
1221     /* add new val */
1222     /* LOG_HEX( key->val.byt, bt->vsz ); */
1223     memcpy( ba->val[ ba->got++ ].byt, key->val.byt, bt->vsz );
1224     ba->vals++;
1225     ba->tot++;
1226     } else {
1227     WRULK( ba->cur, bt );
1228     mktree( bt );
1229     }
1230    
1231     return 0;
1232     } /* lbt_batchval */
1233    
1234    
1235     /* TODO: this function is an awful mess and needs complete rewrite, KR
1236     */
1237     int lbt_add ( Idx *bt, Key *key )
1238     {
1239     int ret = 0;
1240     unsigned short e, i = 0;
1241     Block *b, *s = 0; /* mainblock, sibling */
1242     Dict *d;
1243     unsigned char *base, *sbase, *src, *v, *stk;
1244     FindKey f;
1245     int hadkey, split = 0, insibling = 0;
1246     unsigned need;
1247     PrVal pv;
1248    
1249     f.bt = bt;
1250     f.key = key;
1251     if ( bt->key && key->len > bt->key )
1252     key->len = bt->key;
1253     if ( findkey( &f, LBT_WR ) ) {
1254     if ( f.b[0] )
1255     ULK( f.b[0] );
1256     b = 0;
1257     LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1258     }
1259     e = (LBT_ENTMASK & f.e[0]);
1260     b = f.b[0];
1261     base = LBT_BASE(b);
1262     d = b->dct + e;
1263     if ( ! (hadkey = LBT_MATCH & f.e[0]) ) /* new key */
1264     need = key->len + sizeof(Dict) + bt->vsz;
1265     else {
1266     unsigned vln = LBT_VLN(d);
1267     v = base+LBT_POS(d)+d->kln;
1268     if ( 1 == vln ) { /* check stopword -- value entirely zeroed */
1269     int l = bt->vsz;
1270     unsigned char *C = v;
1271     while ( l-- && !*C++ ) /* huh huh, he said "C++" */
1272     ;
1273     if ( -1 == l ) {
1274     LOG_DBG( LOG_VERBOSE, "key '%.*s' is a stopword", key->len, key->byt );
1275     goto done;
1276     }
1277     }
1278     i = valpos( v, vln, bt->vsz, key->val.byt );
1279     if ( LBT_MATCH & i ) {
1280     LOG_DBG( LOG_DEBUG,
1281     "adding key '%.*s'/%s: already there pos %u",
1282     key->len, key->byt, PRVAL( pv, key->val ), i & ~LBT_MATCH );
1283     goto done; /* value already there */
1284     }
1285     need = bt->vsz;
1286     }
1287     LOG_DBG( LOG_DEBUG,
1288     "adding key '%.*s'/%s in block %d had %d ent %d stk %d need %d",
1289     key->len, key->byt, PRVAL( pv, key->val ),
1290     b->num, hadkey, b->ent, b->stk, need );
1291    
1292     if ( need > b->stk - LBT_DOFF - b->ent*sizeof(Dict) ) {
1293     /* now that's bad: not enough space
1294     create new block s, move last entries there
1295     starting from the end, entries (including the new one) are collected,
1296     until their combined size reaches more than half a block.
1297     If the combined size then is more than 2/3 a block,
1298     and the last entry alone takes more than 1/3 a block, it is split.
1299     TODO: check existing sibling for free space
1300     */
1301     unsigned half = (bt->bsz - LBT_DOFF) >> 1;
1302     unsigned tot = 0;
1303     unsigned mv = b->ent;
1304    
1305     LOG_DBG( LOG_DEBUG, "splitting block %d need %d have %d (stk %d ent %d)",
1306     b->num, need, b->stk - LBT_DOFF - b->ent*sizeof(Dict), b->stk, b->ent );
1307     if ( need > (bt->bsz - LBT_DOFF) / 3 )
1308     LOG_OTO( done, (ERR_IDIOT, "OOPS! excessive need for %d bytes", need) );
1309     /* figure out split position */
1310     if ( !hadkey && b->ent == e ) {
1311     tot = need;
1312     insibling = !0;
1313     }
1314     d = b->dct+mv;
1315     while ( d--, mv-- ) {
1316     unsigned sz = sizeof(Dict)+d->kln+LBT_VLN(d)*bt->vsz;
1317     if ( half > (tot += sz) ) { /* sz fits */
1318     if ( e == mv && !hadkey ) {
1319     /*
1320     check size of new entry.
1321     In case of hadkey, we don't bother for the bt->vsz bytes.
1322     */
1323     if ( half < (tot + need) ) {
1324     if ( (half - tot) > need/2 ) { /* more balanced if we move */
1325     tot += need;
1326     insibling = !0;
1327     }
1328     break;
1329     }
1330     tot += need;
1331     insibling = !0;
1332     }
1333     if ( mv )
1334     continue; /* else croak on ! mv below */
1335     }
1336     /* exceeded half -- maybe we move entry mv anyway */
1337     if ( tot > (bt->bsz - LBT_DOFF)*2/3 ) {
1338     if ( tot-sz < (bt->bsz - LBT_DOFF)/3 )
1339     split = !0;
1340     else { /* undo -- don't move this entry */
1341     tot -= sz;
1342     d++; mv++;
1343     }
1344     } else if ( ! mv )
1345     LOG_OTO( done, (ERR_IDIOT, "OOPS! didn't find end tot %d bytes", tot) );
1346     break;
1347     } /* while mv-- */
1348     if ( mv < e || (mv == e && hadkey && !split) )
1349     insibling = !0;
1350     /* now mv is the first entry to move */
1351     s = NEWBLK( bt, 0, LBT_WR );
1352     LOG_DBG( LOG_DEBUG,
1353     "split: move %d of %d to new blk %d e %d tot %d insibling %d split %d",
1354     mv, b->ent, s->num, e, tot, insibling, split );
1355     s->nxt = b->nxt;
1356     b->nxt = s->num;
1357     sbase = LBT_BASE(s);
1358     if ( split ) {
1359     /* now total includes full size of entry mv.
1360     if we included a new entry, it might even exceed bsz.
1361     calculate sizes for both blocks w/o mv's values.
1362     */
1363     unsigned esz = sizeof(Dict)+d->kln; /* entry's base cost */
1364     unsigned vln = LBT_VLN(d);
1365     unsigned keep, diff, mvval, kpval, mvsiz, pos = LBT_POS(d);
1366     /*
1367     from currently used space, substract tot (including all
1368     values and possibly new key "need"), but add entry base cost.
1369     that should be the size remaining if we where moving
1370     all of the values but keep an entry and key.
1371     maybe slightly biased if need is bt->vsz (the hadkey case).
1372     */
1373     keep = bt->bsz - b->stk + b->ent*sizeof(Dict) + need - tot + esz;
1374     tot -= vln * bt->vsz; /* undo entries */
1375     diff = (keep > tot ? keep - tot : tot - keep) / bt->vsz;
1376     if ( diff > vln )
1377     LOG_OTO( done, (ERR_IDIOT,
1378     "OOPS! got diff %d vln %d entry %d tot %d keep %d",
1379     diff, vln, mv, tot, keep) );
1380     mvval = (vln - diff)/2;
1381     if ( keep > tot )
1382     mvval += diff;
1383     kpval = vln - mvval;
1384     if ( ! mvval || ! kpval ) {
1385     LOG_DBG( LOG_WARN, "bad mv/kpval %d/%d keep %d tot %d vln %d diff %d",
1386     mvval, kpval, keep, tot, vln, diff );
1387     if ( ! mvval )
1388     kpval = vln - (mvval = 1);
1389     else
1390     mvval = vln - (kpval = 1);
1391     }
1392     tot += mvsiz = mvval*bt->vsz;
1393     keep += kpval*bt->vsz;
1394     src = base+pos;
1395     LOG_DBG( LOG_DEBUG,
1396     "split: entry %d '%.*s' mv %d of %d values, i %d keep %d tot %d",
1397     mv, d->kln, src, mvval, vln, i, keep, tot );
1398     if ( keep > (bt->bsz-LBT_DOFF) || tot > (bt->bsz-LBT_DOFF) )
1399     LOG_OTO( done, (ERR_IDIOT,
1400     "OOPS! got diff %d vln %d mvval %d entry %d tot %d keep %d",
1401     diff, vln, mvval, mv, tot, keep) );
1402     /* SIGH -- do it */
1403     memcpy( sbase + (s->stk -= mvsiz),
1404     src+d->kln+kpval*bt->vsz, mvsiz );
1405     if ( e == mv && hadkey && i > kpval ) {
1406     insibling = !0;
1407     s->stk -= bt->vsz;
1408     i -= kpval;
1409     memmove( sbase+s->stk, sbase+s->stk+bt->vsz, i*bt->vsz );
1410     memcpy( sbase+s->stk+i*bt->vsz, key->val.byt, bt->vsz );
1411     mvval++;
1412     }
1413     memcpy( sbase + (s->stk -= d->kln), src, d->kln );
1414     LBT_PVK( s->dct, s->stk, mvval, d->kln );
1415     s->ent = 1;
1416     memmove( src + mvsiz, src, d->kln+kpval*bt->vsz );
1417     pos += mvsiz;
1418     LBT_PV( d, pos, kpval );
1419     d++; mv++;
1420     }
1421     /* now entries mv..b->ent are moved, mv > 0 */
1422     b->stk = LBT_POS(b->dct+mv-1); /* pre-adjust stack */
1423     if ( mv < e ) { /* move entries mv..e-1 */
1424     /* used stack space is from pos of entry e-1 to end of entry mv
1425     (might differ from pos of mv-1 due to split)
1426     */
1427     unsigned from = LBT_POS(b->dct+e-1), nent = e-mv,
1428     sz = LBT_POS(b->dct+mv)+b->dct[mv].kln
1429     +LBT_VLN(b->dct+mv)*bt->vsz - from;
1430     int adj = (s->stk -= sz) - from; /* entry position adjustment */
1431     memcpy( sbase + s->stk, base+from, sz );
1432     memcpy( s->dct+s->ent, b->dct+mv, nent*sizeof(Dict) );
1433     d = s->dct + s->ent;
1434     s->ent += nent;
1435     for ( ; nent--; d++ )
1436     LBT_POSADD( d, adj );
1437     }
1438     if ( insibling && mv <= e ) { /* now mv or insert e, unless done on split */
1439     d = b->dct + e;
1440     if ( hadkey ) { /* mv existing entry e >= mv, add new val */
1441     /* i is valpos */
1442     unsigned vln = LBT_VLN(d);
1443     src = base+LBT_POS(d)+d->kln;
1444     if ( i < vln ) {
1445     unsigned sz = (vln - i)*bt->vsz;
1446     memcpy( sbase + (s->stk -= sz), src + i*bt->vsz, sz );
1447     }
1448     memcpy( sbase + (s->stk -= bt->vsz), key->val.byt, bt->vsz );
1449     if ( i ) {
1450     unsigned sz = i*bt->vsz;
1451     memcpy( sbase + (s->stk -= sz), src, sz );
1452     }
1453     memcpy( sbase + (s->stk -= d->kln), base+LBT_POS(d), d->kln );
1454     LBT_PVK( s->dct + s->ent, s->stk, vln+1, d->kln );
1455     e++; /* e is moved */
1456     } else {
1457     memcpy( sbase + (s->stk -= bt->vsz), key->val.byt, bt->vsz );
1458     memcpy( sbase + (s->stk -= key->len), key->byt, key->len );
1459     LBT_PVK( s->dct + s->ent, s->stk, 1, key->len );
1460     }
1461     s->ent++;
1462     } /* mv or insert e */
1463     if ( e < b->ent && mv < b->ent ) { /* move entries max(e,mv) to end */
1464     unsigned fst = mv < e ? e : mv;
1465     /* used stack space is from pos of entry b->ent-1 (the original b->stk)
1466     to end of entry fst (might differ from pos of fst-1 due to split)
1467     */
1468     unsigned from = LBT_POS(b->dct+b->ent-1), nent = b->ent-fst,
1469     sz = LBT_POS(b->dct+fst)+b->dct[fst].kln
1470     +LBT_VLN(b->dct+fst)*bt->vsz - from;
1471     int adj = (s->stk -= sz) - from; /* entry position adjustment */
1472     LOG_DBG( LOG_DEBUG, "split: entries %d..%d sz %d from %d to %d",
1473     fst, b->ent-1, sz, from, s->stk );
1474     memcpy( sbase + s->stk, base+from, sz );
1475     memcpy( s->dct+s->ent, b->dct+fst, nent*sizeof(Dict) );
1476     d = s->dct + s->ent;
1477     s->ent += nent;
1478     for ( ; nent--; d++ ) {
1479     #if !defined(NDEBUG)
1480     unsigned p = LBT_POS(d);
1481     #endif
1482     LBT_POSADD( d, adj );
1483     #if !defined(NDEBUG)
1484     LOG_DBG( LOG_DEBUG, "new ent %d p/v/k %d/%d/%d o %d",
1485     d - s->dct, LBT_POS(d), LBT_VLN(d), d->kln, p );
1486     #endif
1487     }
1488     }
1489     b->ent = mv; /* kill entries */
1490     LOG_DBG( LOG_DEBUG,
1491     "split: blks %d/%d with %d/%d entries, s starts '%.*s'",
1492     b->num, s->num, b->ent, s->ent, s->dct[0].kln, sbase+LBT_POS(s->dct) );
1493     } /* new sibling */
1494    
1495     if ( ! insibling ) {
1496     /* now we do have enough space */
1497     d = b->dct + e;
1498     stk = base + b->stk; /* bottom of data stack */
1499     if ( hadkey ) { /* insert value at i */
1500     unsigned char *val = base + LBT_POS(d)+d->kln+i*bt->vsz;
1501     need = bt->vsz;
1502     #if 0
1503     LOG_DBG( LOG_DEBUG,
1504     "in entry %d '%.*s' val %d: memmove( %p %p %d) val %p need %d",
1505     e, d->kln, base+LBT_POS(d), i,
1506     stk-need, stk, (val-stk)+need, val, need );
1507     #endif
1508     memmove( stk - need, stk, (val - stk)+need );
1509     memcpy( val - need, key->val.byt, bt->vsz );
1510     LBT_VLNADD( d, 1 );
1511     } else { /* insert key, value and dict entry */
1512     /* take dict and data space from entry e */
1513     unsigned char *nu;
1514     need = key->len + bt->vsz;
1515     if ( e == b->ent )
1516     nu = base + b->stk - need;
1517     else {
1518     /* at end of next entry, i.e. end of block or beg of previous */
1519     nu = base + LBT_POS(d) + d->kln + LBT_VLN(d)*bt->vsz;
1520     memmove( stk - need, stk, nu - stk );
1521     nu -= need;
1522     memmove( d+1, d, (b->ent - e)*sizeof(Dict) );
1523     }
1524     memcpy( nu, key->byt, key->len );
1525     memcpy( nu + key->len, key->val.byt, bt->vsz );
1526     LBT_PV( d, nu-base, 1 );
1527     d->kln = key->len;
1528     d++;
1529     e++;
1530     b->ent++;
1531     }
1532     /*
1533     need is set to the needed (i.e. additionally used) data space,
1534     e is the first dict entry to redirect need bytes lower
1535     */
1536     b->stk -= need;
1537     for ( ; e < b->ent; e++, d++ )
1538     LBT_POSADD( d, -need );
1539     } /* ! insibling */
1540     while ( s ) { /* created sibling */
1541     unsigned num = s->num;
1542     unsigned char *e0 = LBT_BASE(s) + LBT_POS(s->dct);
1543     unsigned l = s->lev + 1;
1544     Block *ins;
1545     Key k;
1546    
1547     memcpy( k.byt, e0, k.len = s->dct[0].kln );
1548     k.val.len = 0;
1549     if ( s->lev ? s->dct[0].vln : split )
1550     memcpy( k.val.byt, e0+k.len, k.val.len = bt->vsz );
1551     LOG_DBG( LOG_DEBUG, "split: had %d/%d on lev %d key '%.*s' vln %d %s",
1552     b->num, num, s->lev, k.len, k.byt, k.val.len, PRVAL( pv, k.val ) );
1553    
1554     /* get parent */
1555     if ( f.b[l]->num == f.n[l] && ! LBT_BUSY(f.b[l]) )
1556     f.b[l]->lck = LBT_WR;
1557     else
1558     f.b[l] = getblk( bt, f.n[l], l, LBT_WR );
1559     LOG_DBG( LOG_DEBUG,
1560     "split: got parent %d lev %d", f.b[l]->num, f.b[l]->lev );
1561     /* MT TODO: again L/Y link hunt */
1562     /* ok, locked the parent -- release s and b */
1563     if ( (ret = WRULK( s, bt )) || (ret = WRULK( b, bt )) )
1564     break;
1565     s = 0;
1566     ins = b = f.b[l];
1567     base = LBT_BASE(b);
1568     e = KEYPOS( bt, b, &k );
1569     LOG_DBG( LOG_DEBUG, "split: got parent pos %d", e );
1570     if ( LBT_MATCH & e ) /* !??? */
1571     LOG_OTO( done, (ERR_TRASH, "OOPS! found key in parent" ) );
1572     if ( ! e ) {
1573     TRACEKEY( ERR_TRASH, &f );
1574     LOG_OTO( done, (ERR_TRASH, "OOPS! parent insert at 0 lev %d", l ) );
1575     }
1576     need = sizeof(Dict) + k.len + 4 + k.val.len;
1577     if ( need > b->stk - LBT_DOFF - b->ent*sizeof(Dict) ) {
1578     unsigned half = ((bt->bsz - b->stk)+b->ent*sizeof(Dict)+need) / 2;
1579     unsigned mv = b->ent, tot = 0, nent, sz;
1580     int adj;
1581     if ( e == b->ent ) {
1582     tot = need;
1583     ins = 0;
1584     }
1585     for ( d = b->dct+mv; d--, mv--; ) {
1586     sz = sizeof(Dict) + d->kln + 4 + d->vln;
1587     if ( half > (tot += sz) ) {
1588     if ( e == mv ) {
1589     if ( half > tot+need || tot+need-half < half-tot ) {
1590     tot += need;
1591     ins = 0; /* new link goes to sibling */
1592     } else
1593     break;
1594     if ( half <= tot )
1595     break;
1596     }
1597     if ( mv )
1598     continue;
1599     }
1600     if ( tot - half > sz/2 ) { /* don't move this */
1601     tot -= sz;
1602     d++; mv++;
1603     } else if ( ! mv ) /* !??? */
1604     mv = 1;
1605     break;
1606     }
1607     /* split here a little bit simpler */
1608     s = NEWBLK( bt, l, LBT_WR );
1609     s->nxt = b->nxt;
1610     b->nxt = s->num;
1611     /* straight move entries mv..b->ent-1 to sibling */
1612     sbase = LBT_BASE(s);
1613     d = b->dct + mv;
1614     sz = b->dct[mv-1].pos - b->stk;
1615     nent = b->ent - mv;
1616     LOG_DBG( LOG_DEBUG,
1617     "split: parent lev %d split %d to new %d mv %d"
1618     " '%.*s'/%s nent %d sz %d",
1619     l, b->num, s->num, mv, d->kln, base+d->pos,
1620     prval( pv, base+d->pos+d->kln, d->vln ), nent, sz );
1621     memcpy( sbase + (s->stk -= sz), base + b->stk, sz );
1622     adj = s->stk - b->stk;
1623     b->stk += sz;
1624     memcpy( s->dct, d, nent*sizeof(Dict) );
1625     b->ent = mv;
1626     s->ent = nent;
1627     for ( d = s->dct; nent--; d++ )
1628     d->pos += adj;
1629     if ( ! ins ) {
1630     ins = s;
1631     e -= mv;
1632     }
1633     if ( (ret = chkblk( s, bt )) )
1634     break;
1635     }
1636     /* now insert link in block ins at pos e */ {
1637     unsigned isz = k.len + 4 + k.val.len, nent = ins->ent - e;
1638     unsigned char *ibase = LBT_BASE(ins);
1639     unsigned end = e ? ins->dct[e-1].pos : bt->bsz;
1640     LOG_DBG( LOG_DEBUG,
1641     "split: ins parent %d lev %d pos %d key '%.*s' -> %d vln %d %s",
1642     ins->num, ins->lev, e, k.len, k.byt, num, k.val.len,
1643     PRVAL( pv, k.val ) );
1644     memmove( ibase + ins->stk - isz, ibase + ins->stk, end - ins->stk );
1645     ins->stk -= isz;
1646     memcpy( ibase + (end -= 4), &num, 4 );
1647     if ( k.val.len )
1648     memcpy( ibase + (end -= k.val.len), k.val.byt, k.val.len );
1649     memcpy( ibase + (end -= k.len), k.byt, k.len );
1650     for ( d = ins->dct+ins->ent; nent--; d-- ) {
1651     *d = d[-1];
1652     d->pos -= isz;
1653     }
1654     d->pos = end;
1655     d->vln = k.val.len;
1656     d->kln = k.len;
1657     ins->ent++;
1658     if ( (ret = chkblk( ins, bt )) )
1659     break;
1660     }
1661     if ( ! s ) /* write b below or in next turn */
1662     break;
1663     if ( ! b->num ) { /* root split */
1664     Block *bb = NEWBLK( bt, l, LBT_WR );
1665     LOG_DBG( LOG_DEBUG, "root split!" );
1666     /* cp all but the num */
1667     memcpy( LBT_BASE(bb)+4, LBT_BASE(b)+4, bt->bsz-4 );
1668     b->nxt = s->nxt; /* restore root's special nxt */
1669     s->nxt = 0;
1670     b->stk = bt->bsz; /* empty the root */
1671     /* add two childs bb and s */
1672     SETINT( base + (b->stk -= 4), bb->num );
1673     LBT_PVK( b->dct, b->stk, 0, 0 );
1674     SETINT( base + (b->stk -= 4), s->num );
1675     e0 = LBT_BASE(s) + LBT_POS(s->dct);
1676     if ( s->dct[0].vln )
1677     memcpy( base + (b->stk -= bt->vsz), e0+s->dct[0].kln, bt->vsz );
1678     memcpy( base + (b->stk -= s->dct[0].kln), e0, s->dct[0].kln );
1679     LBT_PVK( b->dct+1, b->stk, s->dct[0].vln, s->dct[0].kln );
1680     b->ent = 2;
1681     b->lev++; /* increase tree depth */
1682     if ( (ret = WRULK( s, bt )) || (ret = WRULK( bb, bt )) )
1683     break;
1684     s = 0;
1685     break;
1686     }
1687     }
1688     if ( b && !ret )
1689     ret = WRULK( b, bt );
1690     done:
1691     if ( s )
1692     ULK( s );
1693     if ( b )
1694     ULK( b );
1695     if ( !ret && b->lev && findkey( &f, LBT_RD ) )
1696     ret = log_msg( ERR_TRASH, "could not find key after add" );
1697     return ret;
1698     } /* lbt_add */
1699    
1700    
1701     int lbt_del ( Idx *bt, Key *key )
1702     {
1703     int ret = 0;
1704     unsigned short e, n, i, mv;
1705     Block *b;
1706     Dict *d;
1707     unsigned char *base, *v;
1708     FindKey f;
1709     PrVal pv;
1710    
1711     f.bt = bt;
1712     f.key = key;
1713     LOG_DBG( LOG_DEBUG, "deleting key '%.*s'/%s",
1714     key->len, key->byt, PRVAL( pv, key->val ) );
1715     if ( findkey( &f, LBT_WR ) )
1716     LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1717     if ( !(LBT_MATCH & f.e[0]) ) /* key not found - ok */
1718     goto done;
1719     LOG_DBG( LOG_DEBUG, "key found" );
1720     e = (LBT_ENTMASK & f.e[0]);
1721     b = f.b[0];
1722     d = b->dct + e;
1723     base = LBT_BASE(b);
1724     v = base+LBT_POS(d)+d->kln; /* start of values */
1725     n = LBT_VLN(d);
1726     if ( ! n )
1727     goto done;
1728     i = valpos( v, n, bt->vsz, key->val.byt );
1729     LOG_DBG( LOG_DEBUG, "val %sfound pos %u of %u",
1730     (LBT_MATCH & i) ? "" : "not ", i & ~LBT_MATCH, n );
1731     if ( !(LBT_MATCH & i) ) /* val not found - ok */
1732     goto done;
1733     i &= ~LBT_MATCH;
1734     LOG_DBG( LOG_DEBUG,
1735     "del blk %d e %d i/n %d/%d", b->num, e, i, n );
1736     if ( 1 < n ) {
1737     v += i*bt->vsz; /* end of stack area to move: start of this value */
1738     mv = bt->vsz; /* amount to move: one value size */
1739     LBT_VLNADD( d, -1 );
1740     } else { /* sole value -- rm whole entry */
1741     v = base+LBT_POS(d); /* start of key */
1742     mv = d->kln+bt->vsz;
1743     for ( i=b->ent - e - 1; i--; d++ ) /* move the i entries after d down */
1744     *d = d[1];
1745     d = b->dct + e; /* reset */
1746     b->ent--;
1747     }
1748     memmove( base+b->stk+mv, base+b->stk, v-(base+b->stk) );
1749     b->stk += mv;
1750     for ( ; e++ < b->ent; d++ )
1751     LBT_POSADD( d, mv );
1752     WRULK( b, bt );
1753     done:
1754     if ( f.b[0] )
1755     ULK( f.b[0] );
1756     return ret;
1757     } /* lbt_del */
1758    
1759    
1760    
1761     int lbt_loop ( Idx *bt, DXLoop *l )
1762     {
1763     int ret = 0;
1764     unsigned i = 0;
1765     int mode = IDXMODE & l->flg;
1766     Block *b = 0;
1767     Key *cmp = &l->key;
1768     if ( ! cmp->len ) /* start at 1st leaf */
1769     b = getblk( bt, 1, 0, LBT_RD );
1770     else { /* locate key */
1771     FindKey f;
1772     f.bt = bt;
1773     f.key = cmp;
1774     if ( findkey( &f, LBT_RD ) )
1775     LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1776     b = f.b[0];
1777     i = (unsigned)(LBT_ENTMASK & f.e[0]);
1778     if ( !(LBT_MATCH & f.e[0]) && IDXEQ == mode )
1779     goto done;
1780     }
1781     if ( IDXUPTO <= mode )
1782     cmp = &l->to;
1783     for ( ;; i=0 ) { /* blocks */
1784     unsigned char *base = LBT_BASE( b );
1785     Dict *d;
1786     if ( ! b || b->lev )
1787     return -ERR_TRASH;
1788     b->lck = LBT_EX; /* lock */
1789     d = b->dct + i;
1790     for ( ; i<b->ent; i++, d++ ) {
1791     unsigned short pos = LBT_POS( d );
1792     unsigned short vln = LBT_VLN( d );
1793     unsigned short kln = d->kln;
1794     unsigned short j;
1795     unsigned char *v = base+pos+kln;
1796     Key key;
1797     Hit hit;
1798     int diff = memcmp( cmp->byt, base+pos,
1799     kln <= cmp->len ? kln : cmp->len );
1800     switch ( mode ) {
1801     case IDXPF:
1802     if ( diff || kln < cmp->len )
1803     goto moan;
1804     break;
1805     case IDXEQ:
1806     if ( diff || kln != cmp->len )
1807     goto done;
1808     break;
1809     case IDXUPTO:
1810     if ( 0 >= diff )
1811     goto done;
1812     break;
1813     case IDXINCL:
1814     if ( 0 > diff || (!diff && kln > cmp->len) )
1815     goto done;
1816     break;
1817     default: /* unused */
1818     moan:
1819     log_msg( LOG_INFO,
1820     "loop ends on key '%.*s'(%d) %d %d",
1821     kln, base+pos, kln, diff, cmp->len );
1822     goto done;
1823     }
1824     memcpy( key.byt, base+pos, key.len = kln );
1825     if ( l->cb )
1826     for ( j=0; j<vln; j++, v += bt->vsz ) {
1827     rdval( &hit, v, bt->vsz );
1828     ret = l->cb( l->me, &key, &hit );
1829     if ( ret )
1830     goto done;
1831     }
1832     else { /* internal check */
1833     FindKey f;
1834     f.bt = bt;
1835     f.key = &key;
1836     memcpy( key.val.byt, v, key.val.len = bt->vsz );
1837     if ( findkey( &f, LBT_RD ) )
1838     LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1839     if ( !(LBT_MATCH & f.e[0])
1840     || b != f.b[0]
1841     || i != (unsigned)(LBT_ENTMASK & f.e[0])
1842     ) {
1843     log_msg( ERR_TRASH,
1844     "OOPS! want %u(%p).%u got %u(%p).%u (match %x)",
1845     b->num, b, i, f.b[0]->num, f.b[0], LBT_ENTMASK & f.e[0], f.e[0] );
1846     rdval( &hit, v, bt->vsz );
1847     log_msg( ERR_TRASH,
1848     "\twhen looking for '%.*s'/%u.%u.%u.%u",
1849     key.len, key.byt, hit.mfn, hit.tag, hit.occ, hit.pos );
1850     if ( f.b[0]->ent <= (LBT_ENTMASK & f.e[0]) )
1851     log_msg( ERR_TRASH, "\tI found nothing" );
1852     else {
1853     Dict *dd = f.b[0]->dct + (LBT_ENTMASK & f.e[0]);
1854     unsigned char *bb = LBT_BASE(f.b[0])+LBT_POS(dd);
1855     rdval( &hit, bb+dd->kln, bt->vsz );
1856     log_msg( ERR_TRASH,
1857     "\tI found '%.*s'/%u.%u.%u.%u",
1858     dd->kln, bb, hit.mfn, hit.tag, hit.occ, hit.pos );
1859     }
1860     } else
1861     log_msg( LOG_TRACE, "key ok" );
1862     }
1863     }
1864     ULK( b );
1865     if ( ! b->nxt || !(b = getblk(bt, b->nxt, 0, LBT_RD)) )
1866     break;
1867     }
1868     done:
1869     if ( b )
1870     ULK( b );
1871     if ( l->cb )
1872     l->cb( l->me, 0, 0 );
1873     else
1874     log_msg( LOG_INFO,
1875     "checked %i keys in %u blks of %dK depth %u cmp/get/miss %u/%u/%u",
1876     stat.key, bt->len, bt->bsz>>10, bt->root->lev, stat.cmp, stat.get, stat.miss );
1877     return ret;
1878     } /* lbt_loop */
1879    
1880    
1881     /* mimik the plain old ldb_search */
1882     int lbt_search ( Idx *bt, Key *key, LdbPost *post, Rec *rec )
1883     {
1884     int ret = 0;
1885     unsigned i = 0;
1886     int cont = rec && rec->len; /* continuing previous terms search */
1887     char *stk = rec ? ((char*)rec + rec->bytes) : 0; /* term stack */
1888     int prefix;
1889     Block *b = 0;
1890     Key start;
1891     LdbP * const p = post->p; /* really just an alias */
1892    
1893     /* check for prefix match */
1894     if ( post ) {
1895     if ( 8 != bt->vsz )
1896     return log_msg( ERR_INVAL, "bad legacy search on vsz %d", bt->vsz );
1897     prefix = LDB_PFX & post->mode;
1898     } else {
1899     if ( ! rec )
1900     return log_msg( ERR_INVAL, "lbt_search needs rec or post" );
1901     if ( (prefix = key->len && '$' == key->byt[key->len - 1]) )
1902     key->len--;
1903     }
1904    
1905     if ( ! cont )
1906     start = *key;
1907     else {
1908     memset( &start, 0, sizeof(start) );
1909     start.len = 255 < rec->field[rec->len-1].len
1910     ? 255 : rec->field[rec->len-1].len;
1911     memcpy( start.byt, rec->field[rec->len-1].val, start.len );
1912     rec->len = 0;
1913     }
1914    
1915     if ( ! start.len ) /* start at 1st leaf */
1916     b = getblk( bt, 1, 0, LBT_RD );
1917     else { /* locate key */
1918     FindKey f;
1919     f.bt = bt;
1920     f.key = &start;
1921     if ( findkey( &f, LBT_RD ) )
1922     LOG_OTO( ouch, ( ERR_TRASH, "could not find key" ) );
1923     b = f.b[0];
1924     i = (unsigned)(LBT_ENTMASK & f.e[0]);
1925     if ( !(LBT_MATCH & f.e[0]) && ! prefix )
1926     goto done;
1927     }
1928    
1929     for ( ;; i=0 ) { /* blocks */
1930     unsigned char *base = LBT_BASE( b );
1931     Dict *d;
1932     if ( ! b || b->lev )
1933     return -ERR_TRASH;
1934     b->lck = LBT_EX; /* lock */
1935     d = b->dct + i;
1936     for ( ; i<b->ent; i++, d++ ) { /* entries */
1937     unsigned char *ent = base + LBT_POS( d ); /* start of key */
1938     unsigned short kln = d->kln;
1939     if ( memcmp( key->byt, ent, kln <= key->len ? kln : key->len )
1940     || (prefix ? kln < key->len : kln != key->len )
1941     )
1942     goto done;
1943     if ( rec ) { /* record term */
1944     Field *f = rec->field + rec->len; /* field to assign */
1945     /** append term to record, unless it's the same as previous */
1946     if ( rec->len ) {
1947     if ( kln == f[-1].len && !memcmp( f[-1].val, ent, kln ) )
1948     continue;
1949     } else if ( cont ) {
1950     if ( kln == start.len && !memcmp( start.byt, ent, kln ) )
1951     continue;
1952     }
1953     stk -= kln;
1954     if ( stk < (char*)(f+1) ) /* stack reached field dict */
1955     goto done;
1956     memcpy( stk, ent, kln );
1957     f->tag = 0;
1958     f->val = stk;
1959     f->len = kln;
1960     rec->len++;
1961     continue;
1962     }
1963     { /* more locals */
1964     LdbP merge[1024];
1965     unsigned short vln, k, m=0;
1966     unsigned char *c;
1967     int f = post->fil-1; /* highest pos to consider in given postings */
1968     if ( !(vln = LBT_VLN( d )) )
1969     continue;
1970     ent += kln; /* set to start of values */
1971     c = ent + 8*(vln-1); /* last value */
1972     /* collect postings */
1973     for ( k=vln; k--; c-=8 ) {
1974     /* loop backwards (for the fun of it) postings in segment */
1975     int prow, ptag, ppos;
1976     LdbP e; /* the entry */
1977     LdbP samerow; /* highest possible entry w/ same row as e */
1978     #if defined( LDB_BIG_ENDIAN )
1979     /* the 8 bytes of a posting are BIG ENDIAN ! */
1980     memcpy(e.bytes,c,8);
1981     #else
1982     e.bytes[0] = c[7]; e.bytes[1] = c[6];
1983     e.bytes[2] = c[5]; e.bytes[3] = c[4];
1984     e.bytes[4] = c[3]; e.bytes[5] = c[2];
1985     e.bytes[6] = c[1]; e.bytes[7] = c[0];
1986     #endif
1987     prow = LDBP_ROW( &e );
1988     ptag = LDBP_TAG( &e );
1989     ppos = LDBP_POS( &e );
1990     LOG_DBG( LOG_VERBOSE, "post %d.%hd pos %06x", prow, ptag, ppos );
1991     if ( !prow )
1992     continue;
1993     if ( (post->cut && prow >= post->cut)
1994     || (post->tag && post->tag != ptag)
1995     )
1996     continue;
1997     if ( prow < post->skp ) /* quickly bail out on skip condition */
1998     break;
1999     LDBP_SETROWTOP( &samerow, &e ); /* for mfn comparison */
2000     /* sweep down to postings for the same row as e ... */
2001     while ( f >= 0 && LDBP_GT( p+f, &samerow ) )
2002     f--;
2003     if ( LDB_AND & post->mode ) {
2004     int l;
2005     /* loop postings for same row, mark all (that are near enough) */
2006     LDBP_SETROWBOT( &samerow, &e ); /* for mfn comparison */
2007     /* NOTE: postings for row are GT than bottom even if marked */
2008     for ( l = f; l>=0 && LDBP_GT( p+l, &samerow ); l-- ) {
2009     if ( post->near ) {
2010     int dist;
2011     if ( ptag != LDBP_TAG( p+l ) ) continue;
2012     if ( LDB_NEAR_G != post->near ) {
2013     dist = LDBP_POS( p+l ) - LDBP_POS( &e );
2014     if ( dist < 0 ) dist = -dist;
2015     if ( 0 < post->near
2016     ? post->near < dist
2017     : -post->near != dist /* exact $$$$ */
2018     ) continue;
2019     }
2020     }
2021     LDBP_SETMARK( p+l );
2022     }
2023     } else { /* OR mode */
2024     int add;
2025     if ( ! post->near ) /* add if row not found: ignore details */
2026     add = 0 > f || prow > LDBP_ROW( p+f );
2027     else { /* add if no exact match */
2028     int l;
2029     /* NOTE: we don't use mark bit in OR mode, do we ? */
2030     for ( l = f; l>=0 && LDBP_GT( p+l, &e ); l-- )
2031     ;
2032     add = 0 > l || LDBP_GT( &e, p+l );
2033     }
2034     if ( add )
2035     merge[ m++ ] = e;
2036     }
2037     } /* for postings in segment */
2038     if ( m ) { /* merge in the merge buffer */
2039     LdbP *mm = merge;
2040     for ( k = post->fil += m; m && k--; ) {
2041     LdbP src;
2042     if ( k < m || LDBP_GT( mm, &p[k-m] ) ) {
2043     src = *mm++;
2044     m--;
2045     LOG_DBG( LOG_DEBUG, "merging %d at %d", LDBP_ROW(&src), k );
2046     } else
2047     src = p[k-m];
2048     if ( k < post->len )
2049     p[k] = src;
2050     else { /* set cut */
2051     int row = LDBP_ROW( &src );
2052     if ( row < post->cut || !post->cut )
2053     post->cut = row;
2054     }
2055     }
2056     if ( post->fil > post->len )
2057     post->fil = post->len;
2058     if ( post->cut ) /* postings for cut row are unreliable */
2059     while ( post->fil && post->cut <= LDBP_ROW(p+post->fil-1) )
2060     post->fil--;
2061     } /* if ( m ) */
2062     } /* more locals */
2063     } /* for entries in block */
2064     ULK( b );
2065     if ( ! b->nxt || !(b = getblk(bt, b->nxt, 0, LBT_RD)) )
2066     break;
2067     }
2068     done:
2069     if ( rec )
2070     ret = rec->len;
2071     ouch:
2072     if ( b )
2073     ULK( b );
2074     return ret;
2075     } /* lbt_search */
2076    
2077    
2078     int lbt_init ( Idx *bt )
2079     {
2080     int ret = 0, blkbits, i;
2081     Block *l;
2082     int size = lio_size( bt->fd );
2083    
2084     bt->root = 0;
2085     bt->hash = 0;
2086     bt->mem = 0;
2087     for ( i=LBT_LRULEV; i--; )
2088     bt->lru[i] = bt->mru[i] = 0;
2089    
2090     if ( 0x3ff & size ) /* not on 1K boundary */
2091     return log_msg( LOG_ERROR, "bt has bad size %dK + %d",
2092     size >> 10, (int)size & 0x3ff );
2093     if ( size ) { /* read root header */
2094     Block b;
2095     unsigned want = sizeof(b) - LBT_BOFF;
2096     int got = lio_pread( &bt->fd, LBT_BASE(&b), want, 0 );
2097     if ( want != (unsigned)got )
2098     return log_msg( LOG_SYSERR, "could not get root head" );
2099     if ( b.num )
2100     return log_msg( ERR_TRASH, "root has num %d", b.num );
2101     /*if ( LBT_ISIS != b.nxt ) log_msg( LOG_WARN, "magic is 0x%08x", b.nxt );*/
2102     if ( bt->typ && bt->typ != b.typ )
2103     log_msg( LOG_WARN, "typ want 0x02%x got 0x02%x", bt->typ, b.typ );
2104     bt->typ = b.typ;
2105     if ( bt->key && bt->key != b.key )
2106     log_msg( LOG_WARN, "key want %d got %d", bt->key, b.key );
2107     bt->key = b.key;
2108     if ( bt->col && bt->col != b.col )
2109     log_msg( LOG_WARN, "col want %d got %d", bt->col, b.col );
2110     bt->col = b.col;
2111     bt->dpt = b.lev;
2112     /*
2113     b.ent
2114     b.stk
2115     */
2116     } else { /* new index */
2117     bt->dpt = 1;
2118     if ( ! bt->key )
2119     bt->key = 30;
2120     }
2121    
2122     bt->vsz = 8+(0xf & bt->typ);
2123     blkbits = 10 + (3 & (bt->typ >> 4));
2124     bt->bsz = 1 << blkbits;
2125     if ( size % bt->bsz )
2126     return log_msg( ERR_TRASH, "size 0x%08x % blksize", size, bt->bsz );
2127     bt->len = size >> blkbits;
2128     if ( 2 > bt->len && !(LBT_WRITE & bt->flg) )
2129     return log_msg( LOG_SYSERR, "attempt to read empty index" );
2130    
2131     bt->hlen = 16 + bt->len / 32; /* initial cache size */
2132     if ( bt->hlen < 1000 ) bt->hlen = 1000;
2133     if ( !(bt->root = addchunk( bt, bt->hlen )) )
2134     return log_msg( LOG_SYSERR, "\twhen getting bt cache" );
2135     if ( bt->hlen < 256 ) /* the hash */
2136     bt->hlen = 256;
2137     if ( !(bt->hash = (Block**)mAlloc( bt->hlen * sizeof(Block*) )) )
2138     return log_msg( LOG_SYSERR, "\twhen getting bt hash" );
2139     memset( bt->hash, 0, bt->hlen * sizeof(Block*) );
2140    
2141     if ( size && readblk( bt->root, bt, 0 ) )
2142     return log_msg( ERR_TRASH, "could not read root" );
2143     if ( ! bt->len ) {
2144     /* init: link root to leaf */
2145     SETINT( LBT_BASE(bt->root)+(bt->root->stk -= 4), 1 );
2146     LBT_PVK( bt->root->dct, bt->root->stk, 0, 0 );
2147     bt->root->ent = 1;
2148     bt->len = 1;
2149     }
2150     if ( !(l = getblk( bt, 1, 0, LBT_EX )) )
2151     return log_msg( ERR_TRASH, "could not read 1st leaf" );
2152     if ( ! size ) {
2153     log_msg( LOG_INFO, "creating index bsz %u ksz %u vsz %u",
2154     bt->bsz, bt->key, bt->vsz );
2155     bt->root->lev = 1;
2156     if ( WRULK( bt->root, bt ) || WRULK( l, bt ) )
2157     return log_msg( LOG_SYSERR, "\twhen writing base" );
2158     }
2159    
2160     return ret;
2161     } /* lbt_init */
2162    
2163    
2164     void lbt_close ( Idx *bt )
2165     {
2166     Chunk *c, *n = bt->mem;
2167     while ( (c = n) ) {
2168     n = c->nxt;
2169     mFree( c );
2170     }
2171     mFree( bt->hash );
2172     lio_close( &bt->fd, LIO_INOUT );
2173     memset( bt, 0, sizeof(*bt) );
2174     } /* lbt_close */
2175    
2176    
2177     /* ************************************************************
2178     public functions
2179     */
2180    
2181    
2182     void cXMkVal ( Idx *bt, Val *val, Hit *hit )
2183     {
2184     if ( hit ) {
2185     val->len = bt->vsz;
2186     mkval( val, hit );
2187     } else
2188     val->len = 0;
2189     }
2190    
2191     int cXAdd ( Idx *bt, Key *key, Hit *hit )
2192     {
2193     if ( bt->bat ) {
2194     if ( ! key )
2195     key = &bt->bat->key;
2196     cXMkVal( bt, &key->val, hit );
2197     return lbt_batchval( bt, key );
2198     } else if ( !key )
2199     return 0;
2200     else if ( hit ) {
2201     cXMkVal( bt, &key->val, hit );
2202     if ( hit->dbn == 0xffff )
2203     return lbt_del( bt, key );
2204     }
2205     return lbt_add( bt, key );
2206     } /* cXAdd */

  ViewVC Help
Powered by ViewVC 1.1.26