/[webpac]/openisis/current/lbt.c
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /openisis/current/lbt.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 237 - (show annotations)
Mon Mar 8 17:43:12 2004 UTC (20 years ago) by dpavlin
File MIME type: text/plain
File size: 64211 byte(s)
initial import of openisis 0.9.0 vendor drop

1 /*
2 openisis - an open implementation of the CDS/ISIS database
3 Version 0.8.x (patchlevel see file Version)
4 Copyright (C) 2001-2003 by Erik Grziwotz, erik@openisis.org
5
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2.1 of the License, or (at your option) any later version.
10
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
20 see README for more information
21 EOH */
22
23 /*
24 $Id: lbt.c,v 1.35 2003/05/27 11:03:30 kripke Exp $
25 the btree.
26 */
27
28 #include <stdlib.h>
29 #include <string.h>
30
31 /* special
32 #if defined( __GNUC__ ) && defined ( alloca )
33 #include <alloca.h>
34 #endif
35 */
36 #if defined( sparc ) || defined( __ppc__ )
37 # define LDB_BIG_ENDIAN
38 #endif
39
40 #include "lio.h"
41 #include "luti.h"
42 #include "lbt.h"
43
44
45 /*
46 probably all relevant cases can be handled by the more
47 efficient approach of storing recoded key a la strxfrm
48 */
49 #define LBT_USECMP 0
50
51
52 /* ************************************************************
53 private types
54 */
55
56 enum {
57 /* little endian magic text */
58 LBT_OPEN = 'O' | 'P'<<8 | 'E'<<16 | 'N'<<24,
59 LBT_ISIS = 'I' | 'S'<<8 | 'I'<<16 | 'S'<<24
60 };
61
62
63 enum {
64 LBT_LEVELS = 16, /* max depth */
65 LBT_POSBITS = 13, /* bits for position within block */
66 LBT_POSMASK = 0x1fff,
67 LBT_POSMAX = LBT_POSMASK /* obviously :) */
68 };
69
70 typedef struct Dict { /* dictionary of block entries */
71 unsigned short pos; /* 13 bits position of entry, 2 bits val for leaves */
72 unsigned char vln; /* length of entry's value */
73 /* for inner nodes, this is the length of qualifying
74 second key segment (either 0 or bt->vsz <= 23).
75 for leaves, this, together with higher bits of pos,
76 is the number of values of bt->vsz each.
77 */
78 unsigned char kln; /* length of entry's key */
79 /*
80 the following access macros are needed for leaves only
81 */
82 #define LBT_POS( d ) (LBT_POSMASK & (d)->pos)
83 #define LBT_VLN( d ) ((d)->vln | (~LBT_POSMASK & (d)->pos)>>(LBT_POSBITS-8))
84 #define LBT_POSADD( d, a ) do { \
85 (d)->pos = (~LBT_POSMASK & (d)->pos) | (LBT_POSMASK & ((d)->pos + (a))); \
86 } while (0)
87 #define LBT_VLNADD( d, a ) do { \
88 unsigned _vln = LBT_VLN( d ) + (a); \
89 (d)->pos = (LBT_POSMASK & (d)->pos) \
90 | (~LBT_POSMASK & ((_vln)<<(LBT_POSBITS-8))); \
91 (d)->vln = (unsigned char)(_vln); \
92 } while (0)
93 #define LBT_PV( d, p, v ) do { \
94 (d)->pos = (LBT_POSMASK & (p)) | (~LBT_POSMASK & ((v)<<(LBT_POSBITS-8))); \
95 (d)->vln = (unsigned char)(v); \
96 } while (0)
97 #define LBT_PVK( d, p, v, k ) do { \
98 LBT_PV( d, p, v ); \
99 (d)->kln = (unsigned char)(k); \
100 } while (0)
101 } Dict;
102
103
104 /* guard space around block */
105 #ifndef NDEBUG
106 # define LBT_GUARD 64
107 /* template for guard space -- blank or pattern */
108 static unsigned char guard[LBT_GUARD];
109 #else
110 # define LBT_GUARD 0
111 #endif
112
113
114 /*
115 TODO: check the possible performance gain in findkey
116 from separating disk block and administrative data
117 and aligning disk blocks on PAGE boundaries in memory
118 */
119 typedef struct Block {
120 #if LBT_GUARD
121 unsigned char grd[LBT_GUARD];
122 #endif
123 unsigned char ird; /* is reading */
124 unsigned char wrd; /* #threads waiting for read */
125 unsigned char lck; /* is write locked */
126 unsigned char wlk; /* #threads waiting for write lock */
127 #define LBT_BUSY( b ) (*(int*)&(b)->ird) /* any of 1st 4 bytes != 0 */
128 struct Block *nhs; /* next in hash chain */
129 struct Block *nru; /* next (more) recently used */
130 /* TODO: possibly double-link lru list, so we can unlink on every access ? */
131 /* the disk block -- from here on bsz bytes are on disk */
132 #define LBT_BOFF ((char *)&((Block*)0)->num - (char*)0)
133 #define LBT_BASE( b ) ((unsigned char *)&(b)->num)
134 unsigned num; /* number of this block, 0 is root */
135 unsigned nxt; /* right sibling, "OPEN" or "ISIS" for root */
136 /* typ,key,col are redundant in non-root blocks, maybe abused ... */
137 unsigned char typ; /* type: bsz, vsz, flags */
138 unsigned char key; /* max key length */
139 unsigned char col; /* collation */
140 unsigned char lev; /* level over bottom, 0 for leaves */
141 unsigned short ent; /* 10 bits # entries */
142 #define LBT_ENT( b ) (b)->ent /* currently other bits arent abused */
143 unsigned short stk; /* 13 bits offset of keyspace stack */
144 /* offset of dict RELATIVE to base */
145 #define LBT_DOFF ((char *)&((Block*)0)->dct - (char*)&((Block*)0)->num)
146 Dict dct[1];
147 } Block;
148
149 /*
150 LAYOUT:
151 dct is an array of dictionary entries in increasing key order
152 stk is growing downwards from block end towards dict
153 position of entry i is b->stk <= LBT_POS(b->dct+i) < bt->bsz
154 decreasing on i (i.e. stack space is in reverse ordering)
155
156 at LBT_POS(b->dct+i), there are b->dct[i].kln bytes key
157 for a leaf entry, the key is followed by
158 LBT_VLN(b->dct+i) (between 0..1023 incl.) values
159 of bt->vsz (between 8..23 incl) each
160 for an inner entry, the key is followed by
161 4 bytes block number and optionally bt->vsz qualifying value
162
163 SIZES:
164 block size >= 1024
165 block base overhead LBT_DOFF is 16 bytes
166 avail size >= 1008
167 Dict entry is 4 bytes
168 max. used stack space per entry is:
169 inner node: kln+4+bt->vsz <= 255+4+23 = 282
170 new leaf entry (one value): kln+bt->vsz <= 278
171 max total space for a new entry incl. dict is 286 < avail size/3
172 */
173
174 typedef struct Chunk {
175 struct Chunk *nxt;
176 } Chunk;
177
178
179 enum {
180 /* number of levels for lru list */
181 LBT_LRULEV = sizeof(((Idx*)0)->lru)/sizeof(((Idx*)0)->lru[0]),
182 LBT_ENTBITS = 10, /* bits for # enries */
183 LBT_ENTMASK = 0x03ff,
184 LBT_ENTMAX = LBT_ENTMASK,
185 LBT_MATCH = 0x8000, /* entry matched */
186 LBT_ENTSIZE = 8 /* minimum cost of entry: 4 byte dict, 4 byte val */
187 };
188
189
190 typedef struct Batch {
191 Block *cur;
192 unsigned got; /* current for this key */
193 unsigned tot; /* total for this key */
194 unsigned let;
195 /* stats */
196 unsigned keys;
197 unsigned keyl;
198 unsigned maxl;
199 unsigned vals;
200 unsigned maxv;
201 unsigned span;
202 unsigned free;
203 /* buffer */
204 Key key;
205 Val val[LBT_ENTMAX];
206 } Batch;
207
208
209 typedef enum {
210 LBT_RD, /* read -- no lock */
211 LBT_WR, /* write */
212 LBT_EX /* exclusive */
213 } lbt_get;
214
215
216 typedef struct { /* structure filled by findkey */
217 Idx *bt; /* the tree */
218 Key *key; /* the key searched */
219 Block *b[LBT_LEVELS]; /* path to leaf, b[0] is leaf */
220 unsigned n[LBT_LEVELS]; /* block numbers of path */
221 unsigned short e[LBT_LEVELS]; /* entries in path */
222 } FindKey;
223
224
225 typedef struct {
226 unsigned get;
227 unsigned miss;
228 unsigned cmp;
229 unsigned key;
230 } Stat;
231
232 /* buffer to print a value to.
233 lowest 11 bytes are printed as 4 shorts + one int < 4*5+10 = 30 digits+5sep
234 higher 12 bytes are printed hex = 24 bytes.
235 */
236 typedef char PrVal[64];
237
238
239 /* ************************************************************
240 private data
241 */
242
243 /*
244 stat access is not guarded by memory barrier.
245 may be wrong when running on multiple processors
246 */
247 static Stat stat;
248
249
250 /* ************************************************************
251 private functions
252 */
253
254 #if defined( sparc )
255 static unsigned GETINT ( void *m ) /* bus error safe version of *(int*) */
256 {
257 unsigned l;
258 memcpy( &l, m, 4 );
259 return l;
260 }
261 static void SETINT ( void *m, unsigned i )
262 {
263 memcpy( m, &i, 4 );
264 }
265 #else
266 #define GETINT( m ) (*(unsigned*)(m))
267 #define SETINT( m, i ) (*(unsigned*)(m) = (i))
268 #endif
269
270 static void mkval ( Val *v, Hit *ifp )
271 {
272 unsigned char *c = v->byt;
273 switch ( v->len ) {
274 default: memset( c, 0, v->len-11 ); c += v->len-11;
275 case 11: *c++ = (unsigned char) (ifp->dbn >> 8);
276 case 10: *c++ = (unsigned char) ifp->dbn;
277 case 9: *c++ = (unsigned char) (ifp->mfn >> 24);
278 case 8: *c++ = (unsigned char) (ifp->mfn >> 16);
279 case 7: *c++ = (unsigned char) (ifp->mfn >> 8);
280 case 6: *c++ = (unsigned char) ifp->mfn;
281 case 5: *c++ = (unsigned char) (ifp->tag >> 8);
282 case 4: *c++ = (unsigned char) ifp->tag;
283 case 3: *c++ = (unsigned char) ifp->occ;
284 case 2: *c++ = (unsigned char) (ifp->pos >> 8);
285 case 1: *c++ = (unsigned char) ifp->pos;
286 case 0: ;
287 }
288 } /* mkval */
289
290
291 static Hit *rdval ( Hit *ifp, unsigned char *c, unsigned vsz )
292 {
293 ifp->mfn = ifp->tag = ifp->occ = ifp->pos = ifp->dbn = 0;
294 switch ( vsz ) {
295 default: c += vsz-11;
296 case 11: ifp->dbn = (unsigned short)*c++ << 8;
297 case 10: ifp->dbn |= *c++;
298 case 9: ifp->mfn = (unsigned int)*c++ << 24;
299 case 8: ifp->mfn |= (unsigned int)*c++ << 16;
300 case 7: ifp->mfn |= (unsigned int)*c++ << 8;
301 case 6: ifp->mfn |= *c++;
302 case 5: ifp->tag = (unsigned short)*c++ << 8;
303 case 4: ifp->tag |= *c++;
304 case 3: ifp->occ = *c++;
305 case 2: ifp->pos = (unsigned short)*c++ << 8;
306 case 1: ifp->pos |= *c++;
307 case 0: ;
308 }
309 return ifp;
310 } /* rdval */
311
312
313 static char *prval ( PrVal buf, unsigned char *c, unsigned vsz )
314 {
315 char *d = buf;
316 Hit h;
317 if ( ! vsz ) {
318 buf[0] = '#';
319 buf[1] = 0;
320 return buf;
321 }
322 if ( 23 < vsz )
323 vsz = 23;
324 if ( 11 < vsz ) {
325 while ( 11 < vsz-- ) {
326 *d++ = luti_hex[ *c >> 4 ];
327 *d++ = luti_hex[ *c++ & 0xf ];
328 }
329 *d++ = '.';
330 }
331 rdval( &h, c, vsz );
332 d += u2a( d, h.dbn ); *d++ = '.';
333 d += u2a( d, h.mfn ); *d++ = '.';
334 d += u2a( d, h.tag ); *d++ = '.';
335 d += u2a( d, h.occ ); *d++ = '.';
336 d += u2a( d, h.pos ); *d++ = '.';
337 *d = 0;
338 return buf;
339 } /* prval */
340
341 /* for PRVAL f->key->val */
342 #define PRVAL( buf, v ) prval( buf, v.byt, v.len )
343
344
345 /* ************************************************************
346 block I/O & cache mgmt
347 */
348
349 static void iniblk ( Block *b, Idx *bt, unsigned num, unsigned char lev )
350 {
351 memset( b, 0, LBT_BOFF + bt->bsz + LBT_GUARD );
352 b->num = num;
353 b->typ = bt->typ;
354 b->key = bt->key;
355 b->col = bt->col;
356 b->lev = lev;
357 b->stk = bt->bsz;
358 } /* iniblk */
359
360
361 static int chkblk ( Block *b, Idx *bt )
362 {
363 #if 0
364 (void)b; (void)bt;
365 #else
366 int i;
367 unsigned short eod = LBT_DOFF + b->ent*sizeof(Dict), siz, pos, vln, key;
368 unsigned end = bt->bsz, tot = 0;
369 unsigned char *base = LBT_BASE(b);
370 #if LBT_GUARD
371 if ( memcmp( guard, b->grd, LBT_GUARD ) )
372 return log_msg( ERR_TRASH, "OOPS! initial guard frobbed on blk %d", b->num );
373 if ( memcmp( guard, LBT_BASE(b)+bt->bsz, LBT_GUARD ) )
374 return log_msg( ERR_TRASH, "OOPS! initial guard frobbed on blk %d", b->num );
375 #endif
376 if ( b->num > bt->len || (b->num && b->nxt > bt->len) )
377 return log_msg( ERR_TRASH,
378 "bad num %d / nxt %d on lev %d", b->num, b->nxt, b->lev );
379 if ( b->typ != bt->typ || b->key != bt->key || b->col != bt->col )
380 return log_msg( ERR_TRASH,
381 "bad sig 0x%02x,%d,%d in block %d want 0x%02x,%d,%d",
382 b->typ, b->key, b->col, b->num, bt->typ, bt->key, bt->col );
383 if ( b->ent > bt->bsz / LBT_ENTSIZE || b->stk > bt->bsz || b->stk < eod )
384 return log_msg( ERR_TRASH,
385 "bad ent/stk %d,%d in block %d eod %d sz %d",
386 b->ent, b->stk, b->num, eod, bt->bsz );
387 for ( i=0; i<b->ent; i++ ) {
388 Dict *d = &b->dct[i];
389 pos = LBT_POS( d );
390 vln = LBT_VLN( d );
391 key = d->kln;
392 siz = key + (b->lev ? vln+4u : vln * bt->vsz);
393 if ( pos < b->stk || end != (unsigned)pos + siz )
394 return log_msg( ERR_TRASH,
395 "bad pos/vln/key %d/%d/%d in entry %d of block %d"
396 " stack %d sz %d end %d expected %d",
397 pos, vln, key, i, b->num, b->stk, siz, pos+siz, end );
398 if ( b->lev ) {
399 unsigned lnk;
400 if ( vln && vln != +bt->vsz )
401 return log_msg( ERR_TRASH,
402 "bad vln %d in entry %d of block %d vsz %d",
403 vln, i, b->num, bt->vsz );
404 lnk = GETINT( base + pos + key + vln );
405 if ( lnk >= bt->len )
406 return log_msg( ERR_TRASH,
407 "bad link %u in entry %d of block %d",
408 lnk, i, b->num );
409 }
410 #if 1 && ! defined( NDEBUG )
411 if ( i ) {
412 unsigned short op = LBT_POS( d-1 ), okl = d[-1].kln;
413 int mc = memcmp( base+pos, base+op, key<okl ? key : okl ), j;
414 if ( 0 < mc || (!mc && 0 < (mc = (int)key - (int)okl)) )
415 goto ok;
416 j = 0;
417 if ( !mc && b->lev && vln ) { /* compare values */
418 unsigned ovln = d[-1].vln;
419 if ( ! ovln || 0 < (j = memcmp( base+pos+key, base+op+okl, vln )) )
420 goto ok;
421 }
422 return log_msg( ERR_TRASH,
423 "bad key order entry %d of block %d lev %d"
424 " mc %d j %d key %d okl %d vln %d",
425 i, b->num, b->lev, mc, j, key, okl, vln
426 );
427 }
428 ok:
429 #endif
430 tot += siz;
431 end = pos;
432 }
433 if ( tot > bt->bsz - b->stk )
434 return log_msg( ERR_TRASH,
435 "bad total entry size %d in block %d stack %d",
436 tot, b->num, b->stk );
437 #ifndef NDEBUG
438 if ( LOG_DO( LOG_TRACE ) ) {
439 log_msg( LOG_TRACE,
440 "block %d lev %d sig 0x%02x,%d,%d ent %d eod %d stk %d",
441 b->num, b->lev, b->typ, b->key, b->col, b->ent, eod, b->stk
442 );
443 for ( i=0; i<2; i++ ) {
444 int j = i ? b->ent-1 : 0;
445 Dict *d = &b->dct[j];
446 if ( i >= b->ent ) /* if b->ent is 0 or 1, log no or one key */
447 break;
448 pos = LBT_POS( d );
449 vln = LBT_VLN( d );
450 key = d->kln;
451 end = pos + b->lev ? vln+4u : vln * bt->vsz;
452 /* start of value */
453 if ( b->lev )
454 log_msg( LOG_TRACE, "key %d '%.*s'/%d -> %d",
455 j, key, base+pos, vln, GETINT(base+pos+key+vln) );
456 else {
457 Hit h;
458 rdval( &h, base+pos+key, bt->vsz );
459 log_msg( LOG_TRACE, "key %d '%.*s'[%d] = %d[%d,%d,%d]",
460 j, key, base+pos, vln, h.mfn, h.tag, h.occ, h.pos );
461 }
462 }
463 }
464 #endif
465 #endif
466 return 0;
467 } /* chkblk */
468
469
470 static Block *addchunk ( Idx *bt, unsigned cnt )
471 {
472 unsigned ibsz = LBT_BOFF + bt->bsz + LBT_GUARD; /* internal block size */
473 /* increase cnt so it's > 0 and add one extra to return */
474 char *mem = (char *)mAlloc( sizeof(Chunk) + (1 + ++cnt)*ibsz );
475 Chunk *chk = (Chunk *)mem;
476 if ( ! mem )
477 return 0;
478 /* chain the chunk */
479 chk->nxt = bt->mem;
480 bt->mem = chk;
481 mem += sizeof(Chunk);
482 bt->clen += cnt;
483 /* add cnt blocks to level 0 lru list */
484 if ( ! bt->mru[0] ) /* level 0 cache is empty */
485 bt->mru[0] = (Block *)mem;
486 for ( ; cnt--; mem += ibsz ) {
487 Block *b = (Block *)mem;
488 iniblk( b, bt, 0 , 0 ); /* initialise empty block */
489 b->nru = bt->lru[0];
490 bt->lru[0] = b;
491 }
492 iniblk( (Block *)mem, bt, 0 , 0 );
493 return (Block *)mem;
494 } /* addchunk */
495
496
497 static int readblk ( Block *b, Idx *bt, unsigned num )
498 {
499 int got = lio_pread( &bt->fd, LBT_BASE(b), bt->bsz, num*bt->bsz );
500 log_msg( LOG_VERBOSE, "reading block %u got %u", num, got );
501 if ( bt->bsz != (unsigned)got )
502 return log_msg( LOG_SYSERR, "\twhen reading bt block %d", num );
503 if ( b->num != num )
504 return log_msg( ERR_TRASH, "bad num %d in block %d", b->num, num );
505 return chkblk( b, bt );
506 } /* readblk */
507
508
509 static void unlock ( Block *b )
510 {
511 b->lck = 0;
512 if ( b->wlk )
513 (void)LIO_WAKE( b->num );
514 }
515
516 #define ULK( b ) do { \
517 if ( (b)->lck ) { (b)->lck = 0; if ( b->wlk ) unlock( b ); } \
518 } while (0)
519
520
521 static int writeblk ( Block *b, Idx *bt, int ulk )
522 {
523 int got;
524 if ( chkblk( b, bt ) ) /* don't write no shize */
525 return log_msg( ERR_TRASH, "\tbefore writing bt block %d", b->num );
526 got = lio_pwrite( &bt->fd, LBT_BASE(b), bt->bsz, b->num*bt->bsz );
527 log_msg( LOG_VERBOSE, "writing block %u got %u", b->num, got );
528 if ( ulk )
529 ULK( b );
530 if ( bt->bsz != (unsigned)got )
531 return log_msg( LOG_SYSERR, "\twhen writing bt block %d", b->num );
532 return 0;
533 } /* writeblk */
534
535 #define WRULK( b, bt ) writeblk( b, bt, 1 )
536
537
538 #define NEWBLK( bt, lev, lock ) getblk( bt, bt->len, lev, lock )
539 /**
540 get block #num.
541 1) locate block #num in cache.
542 2) if it is found:
543 if it is READING, or we want to lock and it is locked,
544 incr # waiters, wait for it.
545 3) if it is not found:
546 locate a free cache entry, extending the cache if necessary,
547 set the num, set the READING flag, go read it.
548 after reading, if there are waiters, wake them up.
549 since we are the first to get a hold on the block,
550 we may write lock it, if we want.
551 4) return the block.
552 */
553 static Block *getblk ( Idx *bt, unsigned num,
554 unsigned char lev, lbt_get lock )
555 {
556 Block *b;
557 stat.get++; /* dirty */
558 if ( !(stat.get & 0x3fff) ) { /* every 16K */
559 static unsigned lmiss;
560 log_msg( LOG_INFO,
561 "searched %i keys in %u blks cache %d %d%%"
562 " depth %u cmp/get/miss %u/%u/%u %d%% (%d %d%%)",
563 stat.key, bt->len, bt->clen, bt->clen*100 / bt->len,
564 bt->root->lev, stat.cmp, stat.get,
565 stat.miss, stat.miss*100 / stat.get,
566 (stat.miss - lmiss), (stat.miss - lmiss)*100 / 0x4000
567 );
568 lmiss = stat.miss;
569 }
570 startover: /* only used on absurd congestion */
571 b = 0;
572 LOG_DBG( LOG_DEBUG, "get bt block %u %u %d", num, lev, lock );
573 if ( ! num )
574 b = bt->root;
575 else if ( num < bt->len ) {
576 b = bt->hash[ num % bt->hlen ];
577 while ( b && num != b->num )
578 b = b->nhs;
579 }
580 if ( b )
581 while ( b->ird || (lock && b->lck) ) {
582 int wrd = b->ird;
583 int wlk = lock && b->lck;
584 LOG_DBG( LOG_VERBOSE, "lck on block %u %u r%d l%d",
585 num, lev, b->ird, b->lck );
586 if ( LBT_EX <= lock ) {
587 log_msg( LOG_ERROR, "concurrent mod on EX lock" );
588 return 0;
589 }
590 if ( ! lio_lock ) {
591 log_msg( LOG_FATAL, "bad MT environment: concurrent mod, no lock" );
592 return 0;
593 }
594 if ( wrd && 0xff != b->wrd )
595 b->wrd++;
596 if ( wlk && 0xff != b->wlk )
597 b->wlk++;
598 (void)LIO_WAIT( num ); /* zzzzz .... sleep tight */
599 /* some hours later */
600 if ( num != b->num ) {
601 log_msg( LOG_WARN, "heavy congestion: block %d has gone", num );
602 goto startover;
603 }
604 if ( wrd && b->wrd )
605 b->wrd--;
606 if ( wlk && b->wlk )
607 b->wlk--;
608 }
609 else { /* not found or new block */
610 /* find free block */
611 int i = 0, llev, err = 0;
612 stat.miss++; /* dirty */
613 LOG_DBG( LOG_DEBUG, "bt block %u %u not found", num, lev );
614 for ( ; i < (int)LBT_LRULEV; i++ ) {
615 /* check lru list for level i */
616 Block **bp = &bt->lru[i];
617 while ( (b = *bp) && LBT_BUSY(b) )
618 bp = &b->nru;
619 if ( ! b ) /* try next level */
620 continue;
621 *bp = b->nru; /* dequeue */
622 if ( bt->mru[i] == b ) /* was the tail */
623 bt->mru[i] = 0;
624 break;
625 }
626 if ( ! b ) {
627 log_msg( LOG_WARN, "no free blocks in cache" );
628 if ( !(b = addchunk( bt, 64 )) )
629 return 0;
630 /* TODO: should we also extend the hash ? */
631 } else if ( b->num ) { /* got block -- should be hashed somewhere */
632 Block **bp = &bt->hash[ b->num % bt->hlen ];
633 while ( *bp && *bp != b )
634 bp = &(*bp)->nhs;
635 if ( *bp == b )
636 *bp = b->nhs; /* dehash */
637 /* else ... hmmm should have found it */
638 }
639 assert( ! LBT_BUSY( b ) );
640 iniblk( b, bt, num, lev );
641 /* hash it, so waiters can find it */
642 b->nhs = bt->hash[ b->num % bt->hlen ];
643 bt->hash[ b->num % bt->hlen ] = b;
644 if ( num == bt->len ) { /* extend */
645 b->lev = lev;
646 bt->len++;
647 /* TODO: should be configurable */
648 if ( bt->len > bt->clen << 5 && bt->clen < 0x4000 )
649 addchunk( bt, bt->clen / 4 );
650 } else {
651 b->ird = 1;
652 (void)LIO_RELE();
653 err = readblk( b, bt, num );
654 (void)LIO_LOCK();
655 b->ird = 0;
656 if ( ! err && lev != b->lev )
657 err = log_msg( ERR_TRASH, "bad lev %d in block %d want %d",
658 b->lev, b->num, lev );
659 if ( err ) {
660 Block **bp = &bt->hash[ num % bt->hlen ];
661 while ( *bp && *bp != b )
662 bp = &(*bp)->nhs;
663 if ( *bp == b )
664 *bp = b->nhs; /* dehash */
665 b->lev = 0; /* add to level 0 lru list */
666 b->num = 0; /* tell others that it isn't the expected block */
667 }
668 }
669 /* add to it's levels lru list */
670 llev = LBT_LRULEV > b->lev ? b->lev : LBT_LRULEV-1;
671 if ( bt->mru[ llev ] )
672 bt->mru[ llev ]->nru = b;
673 else
674 bt->lru[ llev ] = b;
675 (bt->mru[ llev ] = b)->nru = 0;
676 if ( b->wrd )
677 (void)LIO_WAKE( num );
678 if ( err ) {
679 b->wrd = b->wlk = 0; /* mark block unused */
680 return 0;
681 }
682 } /* got free block */
683 if ( LBT_WR == lock ) {
684 if ( b->lck ) {
685 log_msg( LOG_FATAL, "bt consi: unexpected lock on %u", num );
686 return 0;
687 }
688 b->lck = 1;
689 }
690 return b;
691 } /* getblk */
692
693
694 /** binsearch position for key.
695 position is either the index of the matching entry (0..b->ent-1)
696 or the index (0..b->ent) where key should be inserted.
697 @return position for key (0..b->ent), |ifyed with LBT_MATCH, if found
698 */
699 static unsigned short keypos (
700 #if LBT_USECMP
701 Idx *bt,
702 #define KEYPOS( bt, b, key ) keypos( bt, b, key )
703 #else
704 #define KEYPOS( bt, b, key ) keypos( b, key )
705 #endif
706 Block *b, Key *key )
707 {
708 unsigned char *base = LBT_BASE( b );
709 unsigned char *kbt = key->byt;
710 unsigned char kln = key->len;
711 unsigned short l = 0, r = b->ent;
712 Dict *d = b->dct;
713 /* binsearch invariant:
714 key is < all entries i >= r (we do know relation at r)
715 key is > all entries i < l (don't know relation at l)
716 */
717 while ( l < r ) { /* invariant: l <= keypos < r */
718 unsigned short i = (l + r) >> 1; /* l <= i < r */
719 unsigned short pos = LBT_POS(&d[i]);
720 unsigned char cln = d[i].kln;
721 int cmp =
722 #if LBT_USECMP
723 bt->cmp ? bt->cmp( kbt, base+pos, kln<cln ? kln : cln ) :
724 #endif
725 memcmp( kbt, base+pos, kln<cln ? kln : cln );
726 stat.cmp++; /* dirty */
727 if ( ! cmp && !(cmp = (int)kln - cln) ) {
728 /* match on main key -- what about the value ? */
729 if ( ! b->lev /* ignore on leaves -- no duplicate keys here */
730 || (!d[i].vln && !key->val.len) /* no value */
731 )
732 return LBT_MATCH | i;
733 /* so we're on an inner node */
734 if ( ! d[i].vln ) /* common case */
735 cmp = 1;
736 else if ( ! key->val.len )
737 cmp = -1;
738 else {
739 unsigned char vln = LBT_VLN(&d[i]);
740 int c = key->val.len < d[i].vln ? key->val.len : vln;
741 if ( !(cmp = memcmp( key->val.byt, base+pos+cln, c ))
742 && !(cmp = (int) key->val.len - vln)
743 )
744 return LBT_MATCH | i;
745 }
746 }
747 if ( cmp > 0 )
748 l = i+1;
749 else
750 r = i;
751 }
752 return l; /* == r */
753 } /* keypos */
754
755
756 /** find position for (leaf) val.
757 search v in array b of n values of size sz
758 @return position for value (0..n), |ifyed with LBT_MATCH, if found
759 */
760 static unsigned short valpos ( unsigned char *b,
761 unsigned short n, unsigned char sz, unsigned char *v )
762 {
763 unsigned short l = 0, r = n;
764 /* binsearch invariant:
765 v is < all values i >= r (we do know relation at r)
766 v is > all values i < l (don't know relation at l)
767 */
768 while ( l < r ) { /* invariant: l <= valpos < r */
769 unsigned short i = (l + r) >> 1; /* l <= i < r */
770 int cmp = memcmp( v, b+i*sz, sz );
771 if ( ! cmp )
772 return LBT_MATCH | i;
773 if ( cmp > 0 )
774 l = i+1;
775 else
776 r = i;
777 }
778 return l; /* == r */
779 } /* valpos */
780
781
782 #ifndef NDEBUG
783 static void TRACEKEY ( unsigned lev, FindKey *f )
784 {
785 int i;
786 unsigned n = 0;
787 Hit h;
788 PrVal pv;
789
790 if ( LOG_DONT( lev ) )
791 return;
792 log_msg( lev, "key '%.*s' vln %d val %s",
793 f->key->len, f->key->byt, f->key->val.len,
794 PRVAL( pv, f->key->val ) );
795 if ( f->key->val.len ) {
796 rdval( &h, f->key->val.byt, f->key->val.len );
797 log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d",
798 h.dbn, h.mfn, h.tag, h.occ, h.pos );
799 }
800 for ( i=LBT_LEVELS; i--; ) {
801 Block *b = f->b[i];
802 unsigned on = n, e;
803 Dict *d;
804 if ( ! b )
805 continue;
806 d = b->dct + (e = (LBT_ENTMASK & f->e[i]));
807 if ( i )
808 n = GETINT( LBT_BASE(b)+d->pos+d->kln+d->vln );
809 log_msg( lev, "lev %d n %d e 0x%x = %d %c '%.*s' val %d %s -> %d%s",
810 i, f->n[i], f->e[i], e, (LBT_MATCH & f->e[i]) ? 'M' : '-',
811 d->kln, LBT_BASE(b)+d->pos, d->vln,
812 prval( pv, LBT_BASE(b)+d->pos+d->kln, d->vln ),
813 n, on == b->num ? "" : " LY!"
814 );
815 if ( d->vln ) {
816 rdval( &h, LBT_BASE(b)+d->pos+d->kln, d->vln );
817 log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d",
818 h.dbn, h.mfn, h.tag, h.occ, h.pos );
819 }
820 if ( e ) {
821 d--;
822 log_msg( lev, "\tleft '%.*s' val %d %s",
823 d->kln, LBT_BASE(b)+d->pos, d->vln,
824 prval( pv, LBT_BASE(b)+d->pos+d->kln, d->vln )
825 );
826 if ( d->vln ) {
827 rdval( &h, LBT_BASE(b)+d->pos+d->kln, d->vln );
828 log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d",
829 h.dbn, h.mfn, h.tag, h.occ, h.pos );
830 }
831 }
832 }
833 } /* TRACEKEY */
834 #else
835 #define TRACEKEY( l, f ) (void)(f)
836 #endif
837
838
839 static int findkey ( FindKey *f, lbt_get lock )
840 {
841 int i;
842 Block *b = f->bt->root;
843 unsigned char *pkey = 0, pkln = 0, pvln = 0;
844 if ( b->lev >= LBT_LEVELS )
845 return log_msg( ERR_TRASH, "tree is too deep: %u", b->lev );
846 stat.key++;
847 for ( i=LBT_LEVELS; i--; ) { /* paranoia */
848 f->b[i] = 0;
849 f->n[i] = f->e[i] = 0;
850 }
851 for ( i=b->lev;; ) {
852 unsigned short e;
853 unsigned n;
854 Dict *d;
855 #ifndef NDEBUG
856 chkblk( b, f->bt );
857 #endif
858 f->n[i] = (f->b[i] = b)->num;
859 f->e[i] = e = KEYPOS( f->bt, b, f->key );
860 LOG_DBG( LOG_DEBUG, "keypos %x", e );
861 #ifndef NDEBUG
862 if ( pkey && b->ent ) {
863 /* compare parent key against our first key
864 pkey must be equal for inner node and not greater for leave
865 */
866 int cmp; /* this - pkey >= 0 */
867 unsigned char *ent, ln;
868 d = b->dct;
869 ent = LBT_BASE(b)+LBT_POS(d);
870 ln = pkln < d->kln ? pkln : d->kln;
871 cmp = ln ? memcmp( ent, pkey, ln ) : 0;
872 if ( ! cmp )
873 cmp = (int)d->kln - (int)pkln;
874 if ( ! cmp ) {
875 if ( ! d->vln || ! pvln )
876 cmp = (int)d->vln - (int)pvln;
877 else
878 cmp = memcmp( ent+d->kln, pkey+pkln, f->bt->vsz );
879 }
880 if ( cmp && (i || 0 > cmp) ) {
881 Hit h;
882 PrVal pv;
883 TRACEKEY( ERR_TRASH, f );
884 if ( d->vln ) {
885 rdval( &h, ent+d->kln, d->vln );
886 log_msg( ERR_TRASH, "\tfirst val db %d mfn %d tag %d occ %d pos %d",
887 h.dbn, h.mfn, h.tag, h.occ, h.pos );
888 }
889 return log_msg( ERR_TRASH, "\tbad first '%.*s' val %d %s",
890 d->kln, ent, d->vln, prval( pv, ent+d->kln, d->vln ) );
891 }
892 }
893 #endif
894 /*
895 here goes the Lehmann/Yao race detection:
896 If we ran on right boundary, follow right link
897 */
898 while ( !(LBT_MATCH & e) && b->ent == e && b->nxt && b->num ) {
899 Block *s;
900 unsigned se;
901 b->lck = LBT_WR; /* hold b while checking s */
902 s = getblk( f->bt, b->nxt, i, i ? LBT_RD : lock );
903 if ( i || ! lock )
904 ULK( b );
905 else
906 b->lck = lock;
907 se = KEYPOS( f->bt, s, f->key );
908 if ( !se ) { /* lower than first */
909 ULK( s );
910 break;
911 }
912 LOG_DBG( LOG_DEBUG, "LYao %d -> %d lev %d for '%.*s' got 0x%x",
913 b->num, s->num, i, f->key->len, f->key->byt, se );
914 /* trade b for s */
915 ULK( b );
916 f->n[i] = (f->b[i] = b = s)->num;
917 e = se;
918 }
919 f->e[i] = e;
920 if ( ! i )
921 return 0;
922 if ( !(LBT_MATCH & e) ) {
923 if ( e )
924 f->e[i] = --e;
925 else {
926 TRACEKEY( ERR_TRASH, f );
927 return log_msg( ERR_IDIOT,
928 "ran on left bound of block %u ent %u",
929 b->num, b->ent );
930 }
931 }
932 d = b->dct + (LBT_ENTMASK & e);
933 /* direct access ok in inner node */
934 pkey = LBT_BASE(b)+d->pos;
935 pkln = d->kln;
936 pvln = d->vln;
937 n = GETINT( pkey+pkln+pvln );
938 i--;
939 b = getblk( f->bt, n, i, i ? LBT_RD : lock );
940 }
941 return -1; /* go away */
942 } /* findkey */
943
944
945 static int mktree ( Idx *bt )
946 {
947 Batch *ba = bt->bat;
948 unsigned char lev = 0; /* the level to be indexed */
949 unsigned left = 1; /* block on child level */
950 /* stats */
951 unsigned size = bt->len * bt->bsz;
952 unsigned sval = ba->vals * bt->vsz;
953 unsigned sove = bt->len * LBT_DOFF + sizeof(Dict)*(ba->keys+ba->span);
954 unsigned iblk = 0;
955 unsigned ikey = 0;
956 unsigned ikeyl = 0;
957 unsigned ipfx = 0;
958 unsigned ispn = 0;
959
960 log_msg( LOG_INFO,
961 "end batch:\n"
962 "\t#blks %u total len %u free %u (%.1f%%) overhead %u (%.1f%%)\n"
963 "\t#keys %u total len %u (%.1f%%) avg %.1f max %u\n"
964 "\t#vals %u total len %u (%.1f%%) avg %.1f max %u spans %u",
965 bt->len, size, ba->free, ba->free*100./size, sove, sove*100./size,
966 ba->keys, ba->keyl, ba->keyl*100./size, (float)ba->keyl/ba->keys, ba->maxl,
967 ba->vals, sval, sval*100./size, (float)ba->vals/ba->keys, ba->maxv, ba->span
968 );
969
970 /* build tree */
971 for (;;) { /* levels */
972 unsigned len = 0; /* current is n'th block on level */
973 Block *b = 0;
974 unsigned have = 0; /* room in current block b */
975 unsigned char*base = 0; /* of current block b */
976 Dict *d = 0; /* of current block b */
977 unsigned next = left; /* child */
978
979 left = bt->len;
980 /* get child c and create child pointer in block b */
981 while ( next ) {
982 unsigned char vln = 0;
983 unsigned char *key;
984 unsigned char kln;
985 unsigned need;
986 Block *c = getblk( bt, next, lev, LBT_EX );
987 if ( ! c )
988 return log_msg( ERR_NOMEM, "no child block" );
989 next = c->nxt;
990 if ( ! c->ent ) /* OOPS empty node */
991 continue;
992 key = LBT_BASE(c)+LBT_POS(c->dct);
993 kln = c->dct[0].kln;
994 if ( ! b ) /* the leftmost child on this level */
995 kln = 0; /* always 0 key on left */
996 else if ( lev )
997 vln = LBT_VLN(c->dct);
998 else if ( ba->key.len == kln /* compare key to last of prev. */
999 && !memcmp( ba->key.byt, key, kln ) /* which was saved in ba->key */
1000 ) { /* key spans blocks */
1001 LOG_DBG( LOG_DEBUG, "found span key '%.*s' child %u",
1002 kln, key, c->num );
1003 vln = bt->vsz; /* qualify with value */
1004 } else {
1005 #if 1
1006 unsigned char x = 0;
1007 /* find prefix key */
1008 if ( kln > ba->key.len )
1009 kln = ba->key.len;
1010 while ( x < kln && key[x] == ba->key.byt[x] )
1011 x++;
1012 if ( x < kln ) /* they differ at [x] */
1013 kln = x+1;
1014 else /* equal on the shorter length */
1015 kln++;
1016 if ( kln > c->dct[0].kln )
1017 return log_msg( ERR_TRASH,
1018 "bad key '%.*s' on level %u matching previous of length %u",
1019 c->dct[0].kln, key, lev, ba->key.len );
1020 LOG_DBG( LOG_DEBUG, "prefix on key '%.*s' child %u save %u",
1021 kln, key, c->num, c->dct[0].kln - kln );
1022 ipfx += c->dct[0].kln - kln;
1023 #endif
1024 }
1025 if ( vln )
1026 ispn++;
1027 ikeyl += kln;
1028 need = sizeof(Dict) + kln + vln + 4;
1029 if ( need > have ) {
1030 if ( b ) {
1031 log_msg( LOG_VERBOSE,
1032 "level %u block %u is full %u ent stack %u",
1033 lev, b->num, b->ent, b->stk );
1034 b->nxt = bt->len; /* link */
1035 b->lck = 0;
1036 if ( WRULK( b, bt ) ) goto nowrite;
1037 len++;
1038 }
1039 c->lck = LBT_EX; /* paranoia */
1040 b = NEWBLK( bt, lev+1, LBT_EX );
1041 c->lck = 0;
1042 if ( ! b )
1043 return log_msg( ERR_NOMEM, "no free block" );
1044 if ( b->ent || b->stk != bt->bsz )
1045 return log_msg( ERR_TRASH,
1046 "new block %u is not empty: ent %u stk %u",
1047 b->num, b->ent, b->stk );
1048 base = LBT_BASE( b );
1049 b->lck = LBT_EX; /* lock */
1050 have = bt->bsz - LBT_DOFF; /* - ba->let; TODO: should we ??? */
1051 d = b->dct;
1052 len++;
1053 iblk++;
1054 }
1055 SETINT( base+(b->stk -= 4), c->num );
1056 d->kln = kln;
1057 d->vln = vln;
1058 if ( kln += vln ) /* key + consecutive qualifying value */
1059 memcpy( base+(b->stk -= kln), key, kln );
1060 d->pos = b->stk;
1061 d++;
1062 b->ent++;
1063 ikey++;
1064 have -= need;
1065 /* save the last dance for me */
1066 memcpy( ba->key.byt, LBT_BASE(c)+LBT_POS(&c->dct[c->ent-1]),
1067 ba->key.len = c->dct[c->ent-1].kln );
1068 }
1069 if ( !len )
1070 return log_msg( ERR_TRASH, "level %u was empty", lev );
1071 /* close level */
1072 log_msg( LOG_INFO, "closing level %u with %u blocks %u..%u",
1073 lev, len, left, b->num );
1074 b->lck = 0;
1075 if ( 1 == len ) {
1076 memcpy( bt->root, b, LBT_BOFF + bt->bsz );
1077 b = bt->root;
1078 b->num = 0;
1079 }
1080 if ( WRULK( b, bt ) ) goto nowrite;
1081 lev++;
1082 if ( 2 > len )
1083 break;
1084 }
1085 log_msg( LOG_INFO,
1086 "\tused %u inner blocks %u keys %u bytes avg %.1f\n"
1087 "\t%u #spans (%.1f%%) %u pfx (- %.1f%%)",
1088 iblk, ikey, ikeyl, (float)ikeyl/ikey,
1089 ispn, ispn*100.*bt->vsz/ikeyl, ipfx, ipfx*100./(ipfx+ikeyl) );
1090
1091 mFree( bt->bat );
1092 bt->bat = 0;
1093 return 0;
1094 nowrite:
1095 return log_msg( LOG_SYSERR, "\twhen writing batch block" );
1096 } /* mktree */
1097
1098
1099
1100 /* ************************************************************
1101 package data
1102 */
1103
1104
1105
1106 /* ************************************************************
1107 package functions
1108 */
1109
1110 int lbt_batch ( Idx *bt, unsigned char pctfree )
1111 {
1112 bt->bat = (Batch*)mAlloc( sizeof(Batch) );
1113 if ( ! bt->bat )
1114 return log_msg( ERR_NOMEM, "\twhen getting batch" );
1115 memset( bt->bat, 0, sizeof(Batch) );
1116 if ( pctfree > 50 )
1117 pctfree = 50;
1118 bt->bat->let = pctfree * bt->bsz / 100;
1119 lio_trunc( &bt->fd, 0 );
1120 iniblk( bt->root, bt, 0, 0 );
1121 /* TODO: kill hash */
1122 bt->len = 1;
1123 bt->bat->cur = NEWBLK( bt, 0, LBT_EX );
1124 return 0;
1125 } /* lbt_batch */
1126
1127
1128 int lbt_batchval ( Idx *bt, Key *key )
1129 {
1130 int cmp = 0;
1131 Batch *ba = bt->bat;
1132 if ( key != &ba->key ) {
1133 int l = key->len < ba->key.len ? key->len : ba->key.len; /* min */
1134 if ( ! (cmp = memcmp(key->byt, ba->key.byt, l)) )
1135 cmp = (int)key->len - (int)ba->key.len;
1136 if ( 0 > cmp )
1137 return log_msg( LOG_ERROR, "batch key '%.*s' < last '%.*s'",
1138 key->len, key->byt, ba->key.len, ba->key.byt );
1139 }
1140 /* flush ? */
1141 while ( ! key->val.len /* end */
1142 || LBT_ENTMAX == ba->got /* full */
1143 || cmp /* next key */
1144 ) {
1145 Block *b = ba->cur;
1146 unsigned have = b->stk - LBT_DOFF - b->ent*sizeof(Dict) - ba->let;
1147 unsigned need = sizeof(Dict) + ba->key.len;
1148 unsigned want = need + ba->got * bt->vsz;
1149 unsigned take = ba->got;
1150 if ( have < want ) { /* you can't always get ... */
1151 if ( have <= need )
1152 take = 0;
1153 else {
1154 take = (have - need) / bt->vsz;
1155 if ( 256 > take * bt->vsz )
1156 take = 0;
1157 }
1158 }
1159 #if 0 /* prefix key turbo ??? -- doesn't seem to work all too well */
1160 if ( have < (bt->bsz >> 1) ) { /* ready to waste half a block ? */
1161 Dict *d = &b->dct[ b->ent-1 ];
1162 unsigned char *last = LBT_BASE(b)+LBT_POS(d);
1163 if ( ba->key.byt[0] != last[0]
1164 || ba->key.byt[1] != last[1]
1165 || ba->key.byt[2] != last[2]
1166 )
1167 take = 0;
1168 }
1169 #endif
1170 LOG_DBG( LOG_DEBUG, "batch '%.*s' got %u need %u want %u have %u take %u",
1171 ba->key.len, ba->key.byt, ba->got, need, want, have, take );
1172 /* we have enough room in b for take entries */
1173 if ( take ) {
1174 unsigned char *base = LBT_BASE( b );
1175 unsigned char *s = base + b->stk;
1176 int i = take;
1177 Dict *d = &b->dct[ b->ent++ ];
1178 while ( i-- )
1179 memcpy( s -= bt->vsz, ba->val[i].byt, bt->vsz );
1180 memcpy( s -= ba->key.len, ba->key.byt, ba->key.len );
1181 b->stk = (unsigned short) (s - base);
1182 LBT_PV( d, b->stk, take );
1183 d->kln = ba->key.len;
1184 if ( ba->got > take )
1185 memcpy( ba->val, ba->val+take, sizeof(ba->val[0])*(ba->got - take) );
1186 ba->got -= take;
1187 }
1188 if ( ! ba->got )
1189 break;
1190 if ( take ) /* key spans blocks */
1191 ba->span++;
1192 ba->free += b->stk - LBT_DOFF - b->ent*sizeof(Dict); /* wasted */
1193 b->nxt = bt->len; /* link */
1194 LOG_DBG( LOG_DEBUG, "batch: %u entries left, getting block %u",
1195 ba->got, bt->len );
1196 if ( WRULK( b, bt ) )
1197 return log_msg( LOG_SYSERR, "\twhen writing batch block" );
1198 /* so couldn't flush all, thus we need another block */
1199 b = ba->cur = NEWBLK( bt, 0, LBT_EX );
1200 /* will continue on end or keychange */
1201 if ( b->ent || b->stk < bt->bsz )
1202 return log_msg( ERR_TRASH, "new block %u is not empty: ent %u stk %u",
1203 b->num, b->ent, b->stk );
1204 LOG_DBG( LOG_VERBOSE, "new block %u has %u ent %u stk, will keep %u",
1205 b->num, b->ent, b->stk, ba->let );
1206 }
1207 if ( cmp || ! key->val.len ) /* done with key: stats */
1208 if ( ba->maxv < ba->tot )
1209 ba->maxv = ba->tot;
1210 if ( key->val.len ) {
1211 if ( cmp ) { /* set new key */
1212 if ( ba->got )
1213 return log_msg( ERR_IDIOT, "batch not empty on new key" );
1214 ba->tot = 0;
1215 memcpy( ba->key.byt, key->byt, ba->key.len = key->len );
1216 ba->keys++;
1217 ba->keyl += key->len;
1218 if ( ba->maxl < key->len )
1219 ba->maxl = key->len;
1220 }
1221 /* add new val */
1222 /* LOG_HEX( key->val.byt, bt->vsz ); */
1223 memcpy( ba->val[ ba->got++ ].byt, key->val.byt, bt->vsz );
1224 ba->vals++;
1225 ba->tot++;
1226 } else {
1227 WRULK( ba->cur, bt );
1228 mktree( bt );
1229 }
1230
1231 return 0;
1232 } /* lbt_batchval */
1233
1234
1235 /* TODO: this function is an awful mess and needs complete rewrite, KR
1236 */
1237 int lbt_add ( Idx *bt, Key *key )
1238 {
1239 int ret = 0;
1240 unsigned short e, i = 0;
1241 Block *b, *s = 0; /* mainblock, sibling */
1242 Dict *d;
1243 unsigned char *base, *sbase, *src, *v, *stk;
1244 FindKey f;
1245 int hadkey, split = 0, insibling = 0;
1246 unsigned need;
1247 PrVal pv;
1248
1249 f.bt = bt;
1250 f.key = key;
1251 if ( bt->key && key->len > bt->key )
1252 key->len = bt->key;
1253 if ( findkey( &f, LBT_WR ) ) {
1254 if ( f.b[0] )
1255 ULK( f.b[0] );
1256 b = 0;
1257 LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1258 }
1259 e = (LBT_ENTMASK & f.e[0]);
1260 b = f.b[0];
1261 base = LBT_BASE(b);
1262 d = b->dct + e;
1263 if ( ! (hadkey = LBT_MATCH & f.e[0]) ) /* new key */
1264 need = key->len + sizeof(Dict) + bt->vsz;
1265 else {
1266 unsigned vln = LBT_VLN(d);
1267 v = base+LBT_POS(d)+d->kln;
1268 if ( 1 == vln ) { /* check stopword -- value entirely zeroed */
1269 int l = bt->vsz;
1270 unsigned char *C = v;
1271 while ( l-- && !*C++ ) /* huh huh, he said "C++" */
1272 ;
1273 if ( -1 == l ) {
1274 LOG_DBG( LOG_VERBOSE, "key '%.*s' is a stopword", key->len, key->byt );
1275 goto done;
1276 }
1277 }
1278 i = valpos( v, vln, bt->vsz, key->val.byt );
1279 if ( LBT_MATCH & i ) {
1280 LOG_DBG( LOG_DEBUG,
1281 "adding key '%.*s'/%s: already there pos %u",
1282 key->len, key->byt, PRVAL( pv, key->val ), i & ~LBT_MATCH );
1283 goto done; /* value already there */
1284 }
1285 need = bt->vsz;
1286 }
1287 LOG_DBG( LOG_DEBUG,
1288 "adding key '%.*s'/%s in block %d had %d ent %d stk %d need %d",
1289 key->len, key->byt, PRVAL( pv, key->val ),
1290 b->num, hadkey, b->ent, b->stk, need );
1291
1292 if ( need > b->stk - LBT_DOFF - b->ent*sizeof(Dict) ) {
1293 /* now that's bad: not enough space
1294 create new block s, move last entries there
1295 starting from the end, entries (including the new one) are collected,
1296 until their combined size reaches more than half a block.
1297 If the combined size then is more than 2/3 a block,
1298 and the last entry alone takes more than 1/3 a block, it is split.
1299 TODO: check existing sibling for free space
1300 */
1301 unsigned half = (bt->bsz - LBT_DOFF) >> 1;
1302 unsigned tot = 0;
1303 unsigned mv = b->ent;
1304
1305 LOG_DBG( LOG_DEBUG, "splitting block %d need %d have %d (stk %d ent %d)",
1306 b->num, need, b->stk - LBT_DOFF - b->ent*sizeof(Dict), b->stk, b->ent );
1307 if ( need > (bt->bsz - LBT_DOFF) / 3 )
1308 LOG_OTO( done, (ERR_IDIOT, "OOPS! excessive need for %d bytes", need) );
1309 /* figure out split position */
1310 if ( !hadkey && b->ent == e ) {
1311 tot = need;
1312 insibling = !0;
1313 }
1314 d = b->dct+mv;
1315 while ( d--, mv-- ) {
1316 unsigned sz = sizeof(Dict)+d->kln+LBT_VLN(d)*bt->vsz;
1317 if ( half > (tot += sz) ) { /* sz fits */
1318 if ( e == mv && !hadkey ) {
1319 /*
1320 check size of new entry.
1321 In case of hadkey, we don't bother for the bt->vsz bytes.
1322 */
1323 if ( half < (tot + need) ) {
1324 if ( (half - tot) > need/2 ) { /* more balanced if we move */
1325 tot += need;
1326 insibling = !0;
1327 }
1328 break;
1329 }
1330 tot += need;
1331 insibling = !0;
1332 }
1333 if ( mv )
1334 continue; /* else croak on ! mv below */
1335 }
1336 /* exceeded half -- maybe we move entry mv anyway */
1337 if ( tot > (bt->bsz - LBT_DOFF)*2/3 ) {
1338 if ( tot-sz < (bt->bsz - LBT_DOFF)/3 )
1339 split = !0;
1340 else { /* undo -- don't move this entry */
1341 tot -= sz;
1342 d++; mv++;
1343 }
1344 } else if ( ! mv )
1345 LOG_OTO( done, (ERR_IDIOT, "OOPS! didn't find end tot %d bytes", tot) );
1346 break;
1347 } /* while mv-- */
1348 if ( mv < e || (mv == e && hadkey && !split) )
1349 insibling = !0;
1350 /* now mv is the first entry to move */
1351 s = NEWBLK( bt, 0, LBT_WR );
1352 LOG_DBG( LOG_DEBUG,
1353 "split: move %d of %d to new blk %d e %d tot %d insibling %d split %d",
1354 mv, b->ent, s->num, e, tot, insibling, split );
1355 s->nxt = b->nxt;
1356 b->nxt = s->num;
1357 sbase = LBT_BASE(s);
1358 if ( split ) {
1359 /* now total includes full size of entry mv.
1360 if we included a new entry, it might even exceed bsz.
1361 calculate sizes for both blocks w/o mv's values.
1362 */
1363 unsigned esz = sizeof(Dict)+d->kln; /* entry's base cost */
1364 unsigned vln = LBT_VLN(d);
1365 unsigned keep, diff, mvval, kpval, mvsiz, pos = LBT_POS(d);
1366 /*
1367 from currently used space, substract tot (including all
1368 values and possibly new key "need"), but add entry base cost.
1369 that should be the size remaining if we where moving
1370 all of the values but keep an entry and key.
1371 maybe slightly biased if need is bt->vsz (the hadkey case).
1372 */
1373 keep = bt->bsz - b->stk + b->ent*sizeof(Dict) + need - tot + esz;
1374 tot -= vln * bt->vsz; /* undo entries */
1375 diff = (keep > tot ? keep - tot : tot - keep) / bt->vsz;
1376 if ( diff > vln )
1377 LOG_OTO( done, (ERR_IDIOT,
1378 "OOPS! got diff %d vln %d entry %d tot %d keep %d",
1379 diff, vln, mv, tot, keep) );
1380 mvval = (vln - diff)/2;
1381 if ( keep > tot )
1382 mvval += diff;
1383 kpval = vln - mvval;
1384 if ( ! mvval || ! kpval ) {
1385 LOG_DBG( LOG_WARN, "bad mv/kpval %d/%d keep %d tot %d vln %d diff %d",
1386 mvval, kpval, keep, tot, vln, diff );
1387 if ( ! mvval )
1388 kpval = vln - (mvval = 1);
1389 else
1390 mvval = vln - (kpval = 1);
1391 }
1392 tot += mvsiz = mvval*bt->vsz;
1393 keep += kpval*bt->vsz;
1394 src = base+pos;
1395 LOG_DBG( LOG_DEBUG,
1396 "split: entry %d '%.*s' mv %d of %d values, i %d keep %d tot %d",
1397 mv, d->kln, src, mvval, vln, i, keep, tot );
1398 if ( keep > (bt->bsz-LBT_DOFF) || tot > (bt->bsz-LBT_DOFF) )
1399 LOG_OTO( done, (ERR_IDIOT,
1400 "OOPS! got diff %d vln %d mvval %d entry %d tot %d keep %d",
1401 diff, vln, mvval, mv, tot, keep) );
1402 /* SIGH -- do it */
1403 memcpy( sbase + (s->stk -= mvsiz),
1404 src+d->kln+kpval*bt->vsz, mvsiz );
1405 if ( e == mv && hadkey && i > kpval ) {
1406 insibling = !0;
1407 s->stk -= bt->vsz;
1408 i -= kpval;
1409 memmove( sbase+s->stk, sbase+s->stk+bt->vsz, i*bt->vsz );
1410 memcpy( sbase+s->stk+i*bt->vsz, key->val.byt, bt->vsz );
1411 mvval++;
1412 }
1413 memcpy( sbase + (s->stk -= d->kln), src, d->kln );
1414 LBT_PVK( s->dct, s->stk, mvval, d->kln );
1415 s->ent = 1;
1416 memmove( src + mvsiz, src, d->kln+kpval*bt->vsz );
1417 pos += mvsiz;
1418 LBT_PV( d, pos, kpval );
1419 d++; mv++;
1420 }
1421 /* now entries mv..b->ent are moved, mv > 0 */
1422 b->stk = LBT_POS(b->dct+mv-1); /* pre-adjust stack */
1423 if ( mv < e ) { /* move entries mv..e-1 */
1424 /* used stack space is from pos of entry e-1 to end of entry mv
1425 (might differ from pos of mv-1 due to split)
1426 */
1427 unsigned from = LBT_POS(b->dct+e-1), nent = e-mv,
1428 sz = LBT_POS(b->dct+mv)+b->dct[mv].kln
1429 +LBT_VLN(b->dct+mv)*bt->vsz - from;
1430 int adj = (s->stk -= sz) - from; /* entry position adjustment */
1431 memcpy( sbase + s->stk, base+from, sz );
1432 memcpy( s->dct+s->ent, b->dct+mv, nent*sizeof(Dict) );
1433 d = s->dct + s->ent;
1434 s->ent += nent;
1435 for ( ; nent--; d++ )
1436 LBT_POSADD( d, adj );
1437 }
1438 if ( insibling && mv <= e ) { /* now mv or insert e, unless done on split */
1439 d = b->dct + e;
1440 if ( hadkey ) { /* mv existing entry e >= mv, add new val */
1441 /* i is valpos */
1442 unsigned vln = LBT_VLN(d);
1443 src = base+LBT_POS(d)+d->kln;
1444 if ( i < vln ) {
1445 unsigned sz = (vln - i)*bt->vsz;
1446 memcpy( sbase + (s->stk -= sz), src + i*bt->vsz, sz );
1447 }
1448 memcpy( sbase + (s->stk -= bt->vsz), key->val.byt, bt->vsz );
1449 if ( i ) {
1450 unsigned sz = i*bt->vsz;
1451 memcpy( sbase + (s->stk -= sz), src, sz );
1452 }
1453 memcpy( sbase + (s->stk -= d->kln), base+LBT_POS(d), d->kln );
1454 LBT_PVK( s->dct + s->ent, s->stk, vln+1, d->kln );
1455 e++; /* e is moved */
1456 } else {
1457 memcpy( sbase + (s->stk -= bt->vsz), key->val.byt, bt->vsz );
1458 memcpy( sbase + (s->stk -= key->len), key->byt, key->len );
1459 LBT_PVK( s->dct + s->ent, s->stk, 1, key->len );
1460 }
1461 s->ent++;
1462 } /* mv or insert e */
1463 if ( e < b->ent && mv < b->ent ) { /* move entries max(e,mv) to end */
1464 unsigned fst = mv < e ? e : mv;
1465 /* used stack space is from pos of entry b->ent-1 (the original b->stk)
1466 to end of entry fst (might differ from pos of fst-1 due to split)
1467 */
1468 unsigned from = LBT_POS(b->dct+b->ent-1), nent = b->ent-fst,
1469 sz = LBT_POS(b->dct+fst)+b->dct[fst].kln
1470 +LBT_VLN(b->dct+fst)*bt->vsz - from;
1471 int adj = (s->stk -= sz) - from; /* entry position adjustment */
1472 LOG_DBG( LOG_DEBUG, "split: entries %d..%d sz %d from %d to %d",
1473 fst, b->ent-1, sz, from, s->stk );
1474 memcpy( sbase + s->stk, base+from, sz );
1475 memcpy( s->dct+s->ent, b->dct+fst, nent*sizeof(Dict) );
1476 d = s->dct + s->ent;
1477 s->ent += nent;
1478 for ( ; nent--; d++ ) {
1479 #if !defined(NDEBUG)
1480 unsigned p = LBT_POS(d);
1481 #endif
1482 LBT_POSADD( d, adj );
1483 #if !defined(NDEBUG)
1484 LOG_DBG( LOG_DEBUG, "new ent %d p/v/k %d/%d/%d o %d",
1485 d - s->dct, LBT_POS(d), LBT_VLN(d), d->kln, p );
1486 #endif
1487 }
1488 }
1489 b->ent = mv; /* kill entries */
1490 LOG_DBG( LOG_DEBUG,
1491 "split: blks %d/%d with %d/%d entries, s starts '%.*s'",
1492 b->num, s->num, b->ent, s->ent, s->dct[0].kln, sbase+LBT_POS(s->dct) );
1493 } /* new sibling */
1494
1495 if ( ! insibling ) {
1496 /* now we do have enough space */
1497 d = b->dct + e;
1498 stk = base + b->stk; /* bottom of data stack */
1499 if ( hadkey ) { /* insert value at i */
1500 unsigned char *val = base + LBT_POS(d)+d->kln+i*bt->vsz;
1501 need = bt->vsz;
1502 #if 0
1503 LOG_DBG( LOG_DEBUG,
1504 "in entry %d '%.*s' val %d: memmove( %p %p %d) val %p need %d",
1505 e, d->kln, base+LBT_POS(d), i,
1506 stk-need, stk, (val-stk)+need, val, need );
1507 #endif
1508 memmove( stk - need, stk, (val - stk)+need );
1509 memcpy( val - need, key->val.byt, bt->vsz );
1510 LBT_VLNADD( d, 1 );
1511 } else { /* insert key, value and dict entry */
1512 /* take dict and data space from entry e */
1513 unsigned char *nu;
1514 need = key->len + bt->vsz;
1515 if ( e == b->ent )
1516 nu = base + b->stk - need;
1517 else {
1518 /* at end of next entry, i.e. end of block or beg of previous */
1519 nu = base + LBT_POS(d) + d->kln + LBT_VLN(d)*bt->vsz;
1520 memmove( stk - need, stk, nu - stk );
1521 nu -= need;
1522 memmove( d+1, d, (b->ent - e)*sizeof(Dict) );
1523 }
1524 memcpy( nu, key->byt, key->len );
1525 memcpy( nu + key->len, key->val.byt, bt->vsz );
1526 LBT_PV( d, nu-base, 1 );
1527 d->kln = key->len;
1528 d++;
1529 e++;
1530 b->ent++;
1531 }
1532 /*
1533 need is set to the needed (i.e. additionally used) data space,
1534 e is the first dict entry to redirect need bytes lower
1535 */
1536 b->stk -= need;
1537 for ( ; e < b->ent; e++, d++ )
1538 LBT_POSADD( d, -need );
1539 } /* ! insibling */
1540 while ( s ) { /* created sibling */
1541 unsigned num = s->num;
1542 unsigned char *e0 = LBT_BASE(s) + LBT_POS(s->dct);
1543 unsigned l = s->lev + 1;
1544 Block *ins;
1545 Key k;
1546
1547 memcpy( k.byt, e0, k.len = s->dct[0].kln );
1548 k.val.len = 0;
1549 if ( s->lev ? s->dct[0].vln : split )
1550 memcpy( k.val.byt, e0+k.len, k.val.len = bt->vsz );
1551 LOG_DBG( LOG_DEBUG, "split: had %d/%d on lev %d key '%.*s' vln %d %s",
1552 b->num, num, s->lev, k.len, k.byt, k.val.len, PRVAL( pv, k.val ) );
1553
1554 /* get parent */
1555 if ( f.b[l]->num == f.n[l] && ! LBT_BUSY(f.b[l]) )
1556 f.b[l]->lck = LBT_WR;
1557 else
1558 f.b[l] = getblk( bt, f.n[l], l, LBT_WR );
1559 LOG_DBG( LOG_DEBUG,
1560 "split: got parent %d lev %d", f.b[l]->num, f.b[l]->lev );
1561 /* MT TODO: again L/Y link hunt */
1562 /* ok, locked the parent -- release s and b */
1563 if ( (ret = WRULK( s, bt )) || (ret = WRULK( b, bt )) )
1564 break;
1565 s = 0;
1566 ins = b = f.b[l];
1567 base = LBT_BASE(b);
1568 e = KEYPOS( bt, b, &k );
1569 LOG_DBG( LOG_DEBUG, "split: got parent pos %d", e );
1570 if ( LBT_MATCH & e ) /* !??? */
1571 LOG_OTO( done, (ERR_TRASH, "OOPS! found key in parent" ) );
1572 if ( ! e ) {
1573 TRACEKEY( ERR_TRASH, &f );
1574 LOG_OTO( done, (ERR_TRASH, "OOPS! parent insert at 0 lev %d", l ) );
1575 }
1576 need = sizeof(Dict) + k.len + 4 + k.val.len;
1577 if ( need > b->stk - LBT_DOFF - b->ent*sizeof(Dict) ) {
1578 unsigned half = ((bt->bsz - b->stk)+b->ent*sizeof(Dict)+need) / 2;
1579 unsigned mv = b->ent, tot = 0, nent, sz;
1580 int adj;
1581 if ( e == b->ent ) {
1582 tot = need;
1583 ins = 0;
1584 }
1585 for ( d = b->dct+mv; d--, mv--; ) {
1586 sz = sizeof(Dict) + d->kln + 4 + d->vln;
1587 if ( half > (tot += sz) ) {
1588 if ( e == mv ) {
1589 if ( half > tot+need || tot+need-half < half-tot ) {
1590 tot += need;
1591 ins = 0; /* new link goes to sibling */
1592 } else
1593 break;
1594 if ( half <= tot )
1595 break;
1596 }
1597 if ( mv )
1598 continue;
1599 }
1600 if ( tot - half > sz/2 ) { /* don't move this */
1601 tot -= sz;
1602 d++; mv++;
1603 } else if ( ! mv ) /* !??? */
1604 mv = 1;
1605 break;
1606 }
1607 /* split here a little bit simpler */
1608 s = NEWBLK( bt, l, LBT_WR );
1609 s->nxt = b->nxt;
1610 b->nxt = s->num;
1611 /* straight move entries mv..b->ent-1 to sibling */
1612 sbase = LBT_BASE(s);
1613 d = b->dct + mv;
1614 sz = b->dct[mv-1].pos - b->stk;
1615 nent = b->ent - mv;
1616 LOG_DBG( LOG_DEBUG,
1617 "split: parent lev %d split %d to new %d mv %d"
1618 " '%.*s'/%s nent %d sz %d",
1619 l, b->num, s->num, mv, d->kln, base+d->pos,
1620 prval( pv, base+d->pos+d->kln, d->vln ), nent, sz );
1621 memcpy( sbase + (s->stk -= sz), base + b->stk, sz );
1622 adj = s->stk - b->stk;
1623 b->stk += sz;
1624 memcpy( s->dct, d, nent*sizeof(Dict) );
1625 b->ent = mv;
1626 s->ent = nent;
1627 for ( d = s->dct; nent--; d++ )
1628 d->pos += adj;
1629 if ( ! ins ) {
1630 ins = s;
1631 e -= mv;
1632 }
1633 if ( (ret = chkblk( s, bt )) )
1634 break;
1635 }
1636 /* now insert link in block ins at pos e */ {
1637 unsigned isz = k.len + 4 + k.val.len, nent = ins->ent - e;
1638 unsigned char *ibase = LBT_BASE(ins);
1639 unsigned end = e ? ins->dct[e-1].pos : bt->bsz;
1640 LOG_DBG( LOG_DEBUG,
1641 "split: ins parent %d lev %d pos %d key '%.*s' -> %d vln %d %s",
1642 ins->num, ins->lev, e, k.len, k.byt, num, k.val.len,
1643 PRVAL( pv, k.val ) );
1644 memmove( ibase + ins->stk - isz, ibase + ins->stk, end - ins->stk );
1645 ins->stk -= isz;
1646 memcpy( ibase + (end -= 4), &num, 4 );
1647 if ( k.val.len )
1648 memcpy( ibase + (end -= k.val.len), k.val.byt, k.val.len );
1649 memcpy( ibase + (end -= k.len), k.byt, k.len );
1650 for ( d = ins->dct+ins->ent; nent--; d-- ) {
1651 *d = d[-1];
1652 d->pos -= isz;
1653 }
1654 d->pos = end;
1655 d->vln = k.val.len;
1656 d->kln = k.len;
1657 ins->ent++;
1658 if ( (ret = chkblk( ins, bt )) )
1659 break;
1660 }
1661 if ( ! s ) /* write b below or in next turn */
1662 break;
1663 if ( ! b->num ) { /* root split */
1664 Block *bb = NEWBLK( bt, l, LBT_WR );
1665 LOG_DBG( LOG_DEBUG, "root split!" );
1666 /* cp all but the num */
1667 memcpy( LBT_BASE(bb)+4, LBT_BASE(b)+4, bt->bsz-4 );
1668 b->nxt = s->nxt; /* restore root's special nxt */
1669 s->nxt = 0;
1670 b->stk = bt->bsz; /* empty the root */
1671 /* add two childs bb and s */
1672 SETINT( base + (b->stk -= 4), bb->num );
1673 LBT_PVK( b->dct, b->stk, 0, 0 );
1674 SETINT( base + (b->stk -= 4), s->num );
1675 e0 = LBT_BASE(s) + LBT_POS(s->dct);
1676 if ( s->dct[0].vln )
1677 memcpy( base + (b->stk -= bt->vsz), e0+s->dct[0].kln, bt->vsz );
1678 memcpy( base + (b->stk -= s->dct[0].kln), e0, s->dct[0].kln );
1679 LBT_PVK( b->dct+1, b->stk, s->dct[0].vln, s->dct[0].kln );
1680 b->ent = 2;
1681 b->lev++; /* increase tree depth */
1682 if ( (ret = WRULK( s, bt )) || (ret = WRULK( bb, bt )) )
1683 break;
1684 s = 0;
1685 break;
1686 }
1687 }
1688 if ( b && !ret )
1689 ret = WRULK( b, bt );
1690 done:
1691 if ( s )
1692 ULK( s );
1693 if ( b )
1694 ULK( b );
1695 if ( !ret && b->lev && findkey( &f, LBT_RD ) )
1696 ret = log_msg( ERR_TRASH, "could not find key after add" );
1697 return ret;
1698 } /* lbt_add */
1699
1700
1701 int lbt_del ( Idx *bt, Key *key )
1702 {
1703 int ret = 0;
1704 unsigned short e, n, i, mv;
1705 Block *b;
1706 Dict *d;
1707 unsigned char *base, *v;
1708 FindKey f;
1709 PrVal pv;
1710
1711 f.bt = bt;
1712 f.key = key;
1713 LOG_DBG( LOG_DEBUG, "deleting key '%.*s'/%s",
1714 key->len, key->byt, PRVAL( pv, key->val ) );
1715 if ( findkey( &f, LBT_WR ) )
1716 LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1717 if ( !(LBT_MATCH & f.e[0]) ) /* key not found - ok */
1718 goto done;
1719 LOG_DBG( LOG_DEBUG, "key found" );
1720 e = (LBT_ENTMASK & f.e[0]);
1721 b = f.b[0];
1722 d = b->dct + e;
1723 base = LBT_BASE(b);
1724 v = base+LBT_POS(d)+d->kln; /* start of values */
1725 n = LBT_VLN(d);
1726 if ( ! n )
1727 goto done;
1728 i = valpos( v, n, bt->vsz, key->val.byt );
1729 LOG_DBG( LOG_DEBUG, "val %sfound pos %u of %u",
1730 (LBT_MATCH & i) ? "" : "not ", i & ~LBT_MATCH, n );
1731 if ( !(LBT_MATCH & i) ) /* val not found - ok */
1732 goto done;
1733 i &= ~LBT_MATCH;
1734 LOG_DBG( LOG_DEBUG,
1735 "del blk %d e %d i/n %d/%d", b->num, e, i, n );
1736 if ( 1 < n ) {
1737 v += i*bt->vsz; /* end of stack area to move: start of this value */
1738 mv = bt->vsz; /* amount to move: one value size */
1739 LBT_VLNADD( d, -1 );
1740 } else { /* sole value -- rm whole entry */
1741 v = base+LBT_POS(d); /* start of key */
1742 mv = d->kln+bt->vsz;
1743 for ( i=b->ent - e - 1; i--; d++ ) /* move the i entries after d down */
1744 *d = d[1];
1745 d = b->dct + e; /* reset */
1746 b->ent--;
1747 }
1748 memmove( base+b->stk+mv, base+b->stk, v-(base+b->stk) );
1749 b->stk += mv;
1750 for ( ; e++ < b->ent; d++ )
1751 LBT_POSADD( d, mv );
1752 WRULK( b, bt );
1753 done:
1754 if ( f.b[0] )
1755 ULK( f.b[0] );
1756 return ret;
1757 } /* lbt_del */
1758
1759
1760
1761 int lbt_loop ( Idx *bt, DXLoop *l )
1762 {
1763 int ret = 0;
1764 unsigned i = 0;
1765 int mode = IDXMODE & l->flg;
1766 Block *b = 0;
1767 Key *cmp = &l->key;
1768 if ( ! cmp->len ) /* start at 1st leaf */
1769 b = getblk( bt, 1, 0, LBT_RD );
1770 else { /* locate key */
1771 FindKey f;
1772 f.bt = bt;
1773 f.key = cmp;
1774 if ( findkey( &f, LBT_RD ) )
1775 LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1776 b = f.b[0];
1777 i = (unsigned)(LBT_ENTMASK & f.e[0]);
1778 if ( !(LBT_MATCH & f.e[0]) && IDXEQ == mode )
1779 goto done;
1780 }
1781 if ( IDXUPTO <= mode )
1782 cmp = &l->to;
1783 for ( ;; i=0 ) { /* blocks */
1784 unsigned char *base = LBT_BASE( b );
1785 Dict *d;
1786 if ( ! b || b->lev )
1787 return -ERR_TRASH;
1788 b->lck = LBT_EX; /* lock */
1789 d = b->dct + i;
1790 for ( ; i<b->ent; i++, d++ ) {
1791 unsigned short pos = LBT_POS( d );
1792 unsigned short vln = LBT_VLN( d );
1793 unsigned short kln = d->kln;
1794 unsigned short j;
1795 unsigned char *v = base+pos+kln;
1796 Key key;
1797 Hit hit;
1798 int diff = memcmp( cmp->byt, base+pos,
1799 kln <= cmp->len ? kln : cmp->len );
1800 switch ( mode ) {
1801 case IDXPF:
1802 if ( diff || kln < cmp->len )
1803 goto moan;
1804 break;
1805 case IDXEQ:
1806 if ( diff || kln != cmp->len )
1807 goto done;
1808 break;
1809 case IDXUPTO:
1810 if ( 0 >= diff )
1811 goto done;
1812 break;
1813 case IDXINCL:
1814 if ( 0 > diff || (!diff && kln > cmp->len) )
1815 goto done;
1816 break;
1817 default: /* unused */
1818 moan:
1819 log_msg( LOG_INFO,
1820 "loop ends on key '%.*s'(%d) %d %d",
1821 kln, base+pos, kln, diff, cmp->len );
1822 goto done;
1823 }
1824 memcpy( key.byt, base+pos, key.len = kln );
1825 if ( l->cb )
1826 for ( j=0; j<vln; j++, v += bt->vsz ) {
1827 rdval( &hit, v, bt->vsz );
1828 ret = l->cb( l->me, &key, &hit );
1829 if ( ret )
1830 goto done;
1831 }
1832 else { /* internal check */
1833 FindKey f;
1834 f.bt = bt;
1835 f.key = &key;
1836 memcpy( key.val.byt, v, key.val.len = bt->vsz );
1837 if ( findkey( &f, LBT_RD ) )
1838 LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1839 if ( !(LBT_MATCH & f.e[0])
1840 || b != f.b[0]
1841 || i != (unsigned)(LBT_ENTMASK & f.e[0])
1842 ) {
1843 log_msg( ERR_TRASH,
1844 "OOPS! want %u(%p).%u got %u(%p).%u (match %x)",
1845 b->num, b, i, f.b[0]->num, f.b[0], LBT_ENTMASK & f.e[0], f.e[0] );
1846 rdval( &hit, v, bt->vsz );
1847 log_msg( ERR_TRASH,
1848 "\twhen looking for '%.*s'/%u.%u.%u.%u",
1849 key.len, key.byt, hit.mfn, hit.tag, hit.occ, hit.pos );
1850 if ( f.b[0]->ent <= (LBT_ENTMASK & f.e[0]) )
1851 log_msg( ERR_TRASH, "\tI found nothing" );
1852 else {
1853 Dict *dd = f.b[0]->dct + (LBT_ENTMASK & f.e[0]);
1854 unsigned char *bb = LBT_BASE(f.b[0])+LBT_POS(dd);
1855 rdval( &hit, bb+dd->kln, bt->vsz );
1856 log_msg( ERR_TRASH,
1857 "\tI found '%.*s'/%u.%u.%u.%u",
1858 dd->kln, bb, hit.mfn, hit.tag, hit.occ, hit.pos );
1859 }
1860 } else
1861 log_msg( LOG_TRACE, "key ok" );
1862 }
1863 }
1864 ULK( b );
1865 if ( ! b->nxt || !(b = getblk(bt, b->nxt, 0, LBT_RD)) )
1866 break;
1867 }
1868 done:
1869 if ( b )
1870 ULK( b );
1871 if ( l->cb )
1872 l->cb( l->me, 0, 0 );
1873 else
1874 log_msg( LOG_INFO,
1875 "checked %i keys in %u blks of %dK depth %u cmp/get/miss %u/%u/%u",
1876 stat.key, bt->len, bt->bsz>>10, bt->root->lev, stat.cmp, stat.get, stat.miss );
1877 return ret;
1878 } /* lbt_loop */
1879
1880
1881 /* mimik the plain old ldb_search */
1882 int lbt_search ( Idx *bt, Key *key, LdbPost *post, Rec *rec )
1883 {
1884 int ret = 0;
1885 unsigned i = 0;
1886 int cont = rec && rec->len; /* continuing previous terms search */
1887 char *stk = rec ? ((char*)rec + rec->bytes) : 0; /* term stack */
1888 int prefix;
1889 Block *b = 0;
1890 Key start;
1891 LdbP * const p = post->p; /* really just an alias */
1892
1893 /* check for prefix match */
1894 if ( post ) {
1895 if ( 8 != bt->vsz )
1896 return log_msg( ERR_INVAL, "bad legacy search on vsz %d", bt->vsz );
1897 prefix = LDB_PFX & post->mode;
1898 } else {
1899 if ( ! rec )
1900 return log_msg( ERR_INVAL, "lbt_search needs rec or post" );
1901 if ( (prefix = key->len && '$' == key->byt[key->len - 1]) )
1902 key->len--;
1903 }
1904
1905 if ( ! cont )
1906 start = *key;
1907 else {
1908 memset( &start, 0, sizeof(start) );
1909 start.len = 255 < rec->field[rec->len-1].len
1910 ? 255 : rec->field[rec->len-1].len;
1911 memcpy( start.byt, rec->field[rec->len-1].val, start.len );
1912 rec->len = 0;
1913 }
1914
1915 if ( ! start.len ) /* start at 1st leaf */
1916 b = getblk( bt, 1, 0, LBT_RD );
1917 else { /* locate key */
1918 FindKey f;
1919 f.bt = bt;
1920 f.key = &start;
1921 if ( findkey( &f, LBT_RD ) )
1922 LOG_OTO( ouch, ( ERR_TRASH, "could not find key" ) );
1923 b = f.b[0];
1924 i = (unsigned)(LBT_ENTMASK & f.e[0]);
1925 if ( !(LBT_MATCH & f.e[0]) && ! prefix )
1926 goto done;
1927 }
1928
1929 for ( ;; i=0 ) { /* blocks */
1930 unsigned char *base = LBT_BASE( b );
1931 Dict *d;
1932 if ( ! b || b->lev )
1933 return -ERR_TRASH;
1934 b->lck = LBT_EX; /* lock */
1935 d = b->dct + i;
1936 for ( ; i<b->ent; i++, d++ ) { /* entries */
1937 unsigned char *ent = base + LBT_POS( d ); /* start of key */
1938 unsigned short kln = d->kln;
1939 if ( memcmp( key->byt, ent, kln <= key->len ? kln : key->len )
1940 || (prefix ? kln < key->len : kln != key->len )
1941 )
1942 goto done;
1943 if ( rec ) { /* record term */
1944 Field *f = rec->field + rec->len; /* field to assign */
1945 /** append term to record, unless it's the same as previous */
1946 if ( rec->len ) {
1947 if ( kln == f[-1].len && !memcmp( f[-1].val, ent, kln ) )
1948 continue;
1949 } else if ( cont ) {
1950 if ( kln == start.len && !memcmp( start.byt, ent, kln ) )
1951 continue;
1952 }
1953 stk -= kln;
1954 if ( stk < (char*)(f+1) ) /* stack reached field dict */
1955 goto done;
1956 memcpy( stk, ent, kln );
1957 f->tag = 0;
1958 f->val = stk;
1959 f->len = kln;
1960 rec->len++;
1961 continue;
1962 }
1963 { /* more locals */
1964 LdbP merge[1024];
1965 unsigned short vln, k, m=0;
1966 unsigned char *c;
1967 int f = post->fil-1; /* highest pos to consider in given postings */
1968 if ( !(vln = LBT_VLN( d )) )
1969 continue;
1970 ent += kln; /* set to start of values */
1971 c = ent + 8*(vln-1); /* last value */
1972 /* collect postings */
1973 for ( k=vln; k--; c-=8 ) {
1974 /* loop backwards (for the fun of it) postings in segment */
1975 int prow, ptag, ppos;
1976 LdbP e; /* the entry */
1977 LdbP samerow; /* highest possible entry w/ same row as e */
1978 #if defined( LDB_BIG_ENDIAN )
1979 /* the 8 bytes of a posting are BIG ENDIAN ! */
1980 memcpy(e.bytes,c,8);
1981 #else
1982 e.bytes[0] = c[7]; e.bytes[1] = c[6];
1983 e.bytes[2] = c[5]; e.bytes[3] = c[4];
1984 e.bytes[4] = c[3]; e.bytes[5] = c[2];
1985 e.bytes[6] = c[1]; e.bytes[7] = c[0];
1986 #endif
1987 prow = LDBP_ROW( &e );
1988 ptag = LDBP_TAG( &e );
1989 ppos = LDBP_POS( &e );
1990 LOG_DBG( LOG_VERBOSE, "post %d.%hd pos %06x", prow, ptag, ppos );
1991 if ( !prow )
1992 continue;
1993 if ( (post->cut && prow >= post->cut)
1994 || (post->tag && post->tag != ptag)
1995 )
1996 continue;
1997 if ( prow < post->skp ) /* quickly bail out on skip condition */
1998 break;
1999 LDBP_SETROWTOP( &samerow, &e ); /* for mfn comparison */
2000 /* sweep down to postings for the same row as e ... */
2001 while ( f >= 0 && LDBP_GT( p+f, &samerow ) )
2002 f--;
2003 if ( LDB_AND & post->mode ) {
2004 int l;
2005 /* loop postings for same row, mark all (that are near enough) */
2006 LDBP_SETROWBOT( &samerow, &e ); /* for mfn comparison */
2007 /* NOTE: postings for row are GT than bottom even if marked */
2008 for ( l = f; l>=0 && LDBP_GT( p+l, &samerow ); l-- ) {
2009 if ( post->near ) {
2010 int dist;
2011 if ( ptag != LDBP_TAG( p+l ) ) continue;
2012 if ( LDB_NEAR_G != post->near ) {
2013 dist = LDBP_POS( p+l ) - LDBP_POS( &e );
2014 if ( dist < 0 ) dist = -dist;
2015 if ( 0 < post->near
2016 ? post->near < dist
2017 : -post->near != dist /* exact $$$$ */
2018 ) continue;
2019 }
2020 }
2021 LDBP_SETMARK( p+l );
2022 }
2023 } else { /* OR mode */
2024 int add;
2025 if ( ! post->near ) /* add if row not found: ignore details */
2026 add = 0 > f || prow > LDBP_ROW( p+f );
2027 else { /* add if no exact match */
2028 int l;
2029 /* NOTE: we don't use mark bit in OR mode, do we ? */
2030 for ( l = f; l>=0 && LDBP_GT( p+l, &e ); l-- )
2031 ;
2032 add = 0 > l || LDBP_GT( &e, p+l );
2033 }
2034 if ( add )
2035 merge[ m++ ] = e;
2036 }
2037 } /* for postings in segment */
2038 if ( m ) { /* merge in the merge buffer */
2039 LdbP *mm = merge;
2040 for ( k = post->fil += m; m && k--; ) {
2041 LdbP src;
2042 if ( k < m || LDBP_GT( mm, &p[k-m] ) ) {
2043 src = *mm++;
2044 m--;
2045 LOG_DBG( LOG_DEBUG, "merging %d at %d", LDBP_ROW(&src), k );
2046 } else
2047 src = p[k-m];
2048 if ( k < post->len )
2049 p[k] = src;
2050 else { /* set cut */
2051 int row = LDBP_ROW( &src );
2052 if ( row < post->cut || !post->cut )
2053 post->cut = row;
2054 }
2055 }
2056 if ( post->fil > post->len )
2057 post->fil = post->len;
2058 if ( post->cut ) /* postings for cut row are unreliable */
2059 while ( post->fil && post->cut <= LDBP_ROW(p+post->fil-1) )
2060 post->fil--;
2061 } /* if ( m ) */
2062 } /* more locals */
2063 } /* for entries in block */
2064 ULK( b );
2065 if ( ! b->nxt || !(b = getblk(bt, b->nxt, 0, LBT_RD)) )
2066 break;
2067 }
2068 done:
2069 if ( rec )
2070 ret = rec->len;
2071 ouch:
2072 if ( b )
2073 ULK( b );
2074 return ret;
2075 } /* lbt_search */
2076
2077
2078 int lbt_init ( Idx *bt )
2079 {
2080 int ret = 0, blkbits, i;
2081 Block *l;
2082 int size = lio_size( bt->fd );
2083
2084 bt->root = 0;
2085 bt->hash = 0;
2086 bt->mem = 0;
2087 for ( i=LBT_LRULEV; i--; )
2088 bt->lru[i] = bt->mru[i] = 0;
2089
2090 if ( 0x3ff & size ) /* not on 1K boundary */
2091 return log_msg( LOG_ERROR, "bt has bad size %dK + %d",
2092 size >> 10, (int)size & 0x3ff );
2093 if ( size ) { /* read root header */
2094 Block b;
2095 unsigned want = sizeof(b) - LBT_BOFF;
2096 int got = lio_pread( &bt->fd, LBT_BASE(&b), want, 0 );
2097 if ( want != (unsigned)got )
2098 return log_msg( LOG_SYSERR, "could not get root head" );
2099 if ( b.num )
2100 return log_msg( ERR_TRASH, "root has num %d", b.num );
2101 /*if ( LBT_ISIS != b.nxt ) log_msg( LOG_WARN, "magic is 0x%08x", b.nxt );*/
2102 if ( bt->typ && bt->typ != b.typ )
2103 log_msg( LOG_WARN, "typ want 0x02%x got 0x02%x", bt->typ, b.typ );
2104 bt->typ = b.typ;
2105 if ( bt->key && bt->key != b.key )
2106 log_msg( LOG_WARN, "key want %d got %d", bt->key, b.key );
2107 bt->key = b.key;
2108 if ( bt->col && bt->col != b.col )
2109 log_msg( LOG_WARN, "col want %d got %d", bt->col, b.col );
2110 bt->col = b.col;
2111 bt->dpt = b.lev;
2112 /*
2113 b.ent
2114 b.stk
2115 */
2116 } else { /* new index */
2117 bt->dpt = 1;
2118 if ( ! bt->key )
2119 bt->key = 30;
2120 }
2121
2122 bt->vsz = 8+(0xf & bt->typ);
2123 blkbits = 10 + (3 & (bt->typ >> 4));
2124 bt->bsz = 1 << blkbits;
2125 if ( size % bt->bsz )
2126 return log_msg( ERR_TRASH, "size 0x%08x % blksize", size, bt->bsz );
2127 bt->len = size >> blkbits;
2128 if ( 2 > bt->len && !(LBT_WRITE & bt->flg) )
2129 return log_msg( LOG_SYSERR, "attempt to read empty index" );
2130
2131 bt->hlen = 16 + bt->len / 32; /* initial cache size */
2132 if ( bt->hlen < 1000 ) bt->hlen = 1000;
2133 if ( !(bt->root = addchunk( bt, bt->hlen )) )
2134 return log_msg( LOG_SYSERR, "\twhen getting bt cache" );
2135 if ( bt->hlen < 256 ) /* the hash */
2136 bt->hlen = 256;
2137 if ( !(bt->hash = (Block**)mAlloc( bt->hlen * sizeof(Block*) )) )
2138 return log_msg( LOG_SYSERR, "\twhen getting bt hash" );
2139 memset( bt->hash, 0, bt->hlen * sizeof(Block*) );
2140
2141 if ( size && readblk( bt->root, bt, 0 ) )
2142 return log_msg( ERR_TRASH, "could not read root" );
2143 if ( ! bt->len ) {
2144 /* init: link root to leaf */
2145 SETINT( LBT_BASE(bt->root)+(bt->root->stk -= 4), 1 );
2146 LBT_PVK( bt->root->dct, bt->root->stk, 0, 0 );
2147 bt->root->ent = 1;
2148 bt->len = 1;
2149 }
2150 if ( !(l = getblk( bt, 1, 0, LBT_EX )) )
2151 return log_msg( ERR_TRASH, "could not read 1st leaf" );
2152 if ( ! size ) {
2153 log_msg( LOG_INFO, "creating index bsz %u ksz %u vsz %u",
2154 bt->bsz, bt->key, bt->vsz );
2155 bt->root->lev = 1;
2156 if ( WRULK( bt->root, bt ) || WRULK( l, bt ) )
2157 return log_msg( LOG_SYSERR, "\twhen writing base" );
2158 }
2159
2160 return ret;
2161 } /* lbt_init */
2162
2163
2164 void lbt_close ( Idx *bt )
2165 {
2166 Chunk *c, *n = bt->mem;
2167 while ( (c = n) ) {
2168 n = c->nxt;
2169 mFree( c );
2170 }
2171 mFree( bt->hash );
2172 lio_close( &bt->fd, LIO_INOUT );
2173 memset( bt, 0, sizeof(*bt) );
2174 } /* lbt_close */
2175
2176
2177 /* ************************************************************
2178 public functions
2179 */
2180
2181
2182 void cXMkVal ( Idx *bt, Val *val, Hit *hit )
2183 {
2184 if ( hit ) {
2185 val->len = bt->vsz;
2186 mkval( val, hit );
2187 } else
2188 val->len = 0;
2189 }
2190
2191 int cXAdd ( Idx *bt, Key *key, Hit *hit )
2192 {
2193 if ( bt->bat ) {
2194 if ( ! key )
2195 key = &bt->bat->key;
2196 cXMkVal( bt, &key->val, hit );
2197 return lbt_batchval( bt, key );
2198 } else if ( !key )
2199 return 0;
2200 else if ( hit ) {
2201 cXMkVal( bt, &key->val, hit );
2202 if ( hit->dbn == 0xffff )
2203 return lbt_del( bt, key );
2204 }
2205 return lbt_add( bt, key );
2206 } /* cXAdd */

  ViewVC Help
Powered by ViewVC 1.1.26