--- cvs-head/lib/WAIT/Table.pm 2000/11/12 01:26:10 31 +++ trunk/lib/WAIT/Table.pm 2004/07/13 19:06:46 111 @@ -4,9 +4,9 @@ # Author : Ulrich Pfeifer # Created On : Thu Aug 8 13:05:10 1996 # Last Modified By: Ulrich Pfeifer -# Last Modified On: Fri May 19 14:51:14 2000 +# Last Modified On: Wed Jan 23 14:15:15 2002 # Language : CPerl -# Update Count : 133 +# Update Count : 152 # Status : Unknown, Use with caution! # # Copyright (c) 1996-1997, Ulrich Pfeifer @@ -25,6 +25,7 @@ =cut package WAIT::Table; +our $VERSION = "2.000"; use WAIT::Table::Handle (); require WAIT::Parse::Base; @@ -32,11 +33,8 @@ use strict; use Carp; # use autouse Carp => qw( croak($) ); -use DB_File; +use BerkeleyDB; use Fcntl; -use LockFile::Simple (); - -my $USE_RECNO = 0; =head2 Creating a Table. @@ -133,8 +131,9 @@ my $self = {}; # Check for mandatory attrs early - $self->{name} = $parm{name} or croak "No name specified"; - $self->{attr} = $parm{attr} or croak "No attributes specified"; + for my $x (qw(name attr env maindbfile tablename)) { + $self->{$x} = $parm{$x} or croak "No $x specified"; + } # Do that before we eventually add '_weight' to attributes. $self->{keyset} = $parm{keyset} || [[@{$parm{attr}}]]; @@ -158,17 +157,8 @@ unshift @{$parm{attr}}, '_weight' unless $attr{'_weight'}; } - $self->{file} = $parm{file} or croak "No file specified"; - if (-d $self->{file}){ - warn "Warning: Directory '$self->{file}' already exists\n"; - } elsif (!mkdir($self->{file}, 0775)) { - croak "Could not 'mkdir $self->{file}': $!\n"; - } - - my $lockmgr = LockFile::Simple->make(-autoclean => 1); - # aquire a write lock - $self->{write_lock} = $lockmgr->lock($self->{file} . '/write') - or die "Can't lock '$self->{file}/write'"; + $self->{path} = $parm{path} or croak "No path specified"; + bless $self, $type; $self->{djk} = $parm{djk} if defined $parm{djk}; $self->{layout} = $parm{layout} || new WAIT::Parse::Base; @@ -177,7 +167,9 @@ $self->{deleted} = {}; # no deleted records yet $self->{indexes} = {}; - bless $self, $type; + # Checking for readers is not necessary, but let's go with the + # generic method. + # Call create_index() and create_index() for compatibility for (@{$self->{keyset}||[]}) { #carp "Specification of indexes at table create time is deprecated"; @@ -187,25 +179,36 @@ # carp "Specification of inverted indexes at table create time is deprecated"; my $att = shift @{$parm{invindex}}; my @spec = @{shift @{$parm{invindex}}}; - my @opt; + my @opt = (); if (ref($spec[0])) { - carp "Secondary pipelines are deprecated\n"; + warn "Secondary pipelines are deprecated"; @opt = %{shift @spec}; } - $self->create_inverted_index(attribute => $att, pipeline => \@spec, @opt); + $self->create_inverted_index(attribute => $att, + pipeline => \@spec, + @opt); } $self; # end of backwarn compatibility stuff } +for my $accessor (qw(maindbfile tablename)) { + no strict 'refs'; + *{$accessor} = sub { + my($self) = @_; + return $self->{$accessor} if $self->{$accessor}; + require Carp; + Carp::confess("accessor $accessor not there"); + } +} + =head2 Creating an index $tb->create_index('docid'); -=item C - +C must be called with a list of attributes. This must be a subset of the attributes specified when the table was created. Currently this method must be called before the first tuple is inserted in the @@ -222,8 +225,16 @@ require WAIT::Index; my $name = join '-', @_; + #### warn "WARNING: Suspect use of \$_ in method create_index. name[$name]_[$_]"; $self->{indexes}->{$name} = - new WAIT::Index file => $self->{file}.'/'.$name, attr => $_; + WAIT::Index->new( + file => $self->path.'/'.$name, + subname => $name, + env => $self->{env}, + maindbfile => $self->maindbfile, + tablename => $self->tablename, + attr => $_, + ); } =head2 Creating an inverted index @@ -284,18 +295,22 @@ } my $name = join '_', ($parm{attribute}, @{$parm{pipeline}}); - my $idx = new WAIT::InvertedIndex(file => $self->{file}.'/'.$name, - filter => [@{$parm{pipeline}}], # clone - name => $name, - attr => $parm{attribute}, - %opt, # backward compatibility stuff - ); + my $idx = WAIT::InvertedIndex->new(file => $self->path.'/'.$name, + subname=> $name, + env => $self->{env}, + maindbfile => $self->maindbfile, + tablename => $self->tablename, + filter => [@{$parm{pipeline}}], # clone + name => $name, + attr => $parm{attribute}, + %opt, # backward compatibility stuff + ); # We will have to use $parm{predicate} here push @{$self->{inverted}->{$parm{attribute}}}, $idx; } sub dir { - $_[0]->{file}; + $_[0]->path; } =head2 C<$tb-Elayout> @@ -323,16 +338,18 @@ sub drop { my $self = shift; + if ((caller)[0] eq 'WAIT::Database') { # database knows about this $self->close; # just make sure - my $file = $self->{file}; + + my $path = $self->path; for (values %{$self->{indexes}}) { $_->drop; } - unlink "$file/records"; - # $self->unlock; - ! (!-e $file or rmdir $file); + unlink "$path/records"; + rmdir "$path/read" or warn "Could not rmdir '$path/read'"; + } else { croak ref($self)."::drop called directly"; } @@ -346,9 +363,17 @@ require $module; } +sub path { + my($self) = @_; + return $self->{path} if $self->{path}; + require Data::Dumper; print STDERR "Line " . __LINE__ . ", File: " . __FILE__ . "\n" . Data::Dumper->new([$self],[qw(self)])->Indent(1)->Useqq(1)->Dump; # XXX + require Carp; + Carp::confess("NO path attr"); +} + sub open { my $self = shift; - my $file = $self->{file} . '/records'; + my $path = $self->path . '/records'; mrequire ref($self); # that's tricky eh? if (defined $self->{'layout'}) { @@ -359,82 +384,53 @@ } if (exists $self->{indexes}) { require WAIT::Index; - for (values %{$self->{indexes}}) { - $_->{mode} = $self->{mode}; + for my $Ind (values %{$self->{indexes}}) { + for my $x (qw(mode env maindbfile)) { + $Ind->{$x} = $self->{$x}; + } } } if (exists $self->{inverted}) { my ($att, $idx); for $att (keys %{$self->{inverted}}) { for $idx (@{$self->{inverted}->{$att}}) { - $idx->{mode} = $self->{mode}; + for my $x (qw(mode env maindbfile)) { + $idx->{$x} = $self->{$x}; + } } } require WAIT::InvertedIndex; } - unless (defined $self->{dbh}) { - if ($USE_RECNO) { - $self->{dbh} = tie(@{$self->{db}}, 'DB_File', $file, - $self->{mode}, 0664, $DB_RECNO); - } else { - $self->{dbh} = - tie(%{$self->{db}}, 'DB_File', $file, - $self->{mode}, 0664, $DB_BTREE); - } - } - # Locking - # - # We allow multiple readers to coexists. But write access excludes - # all read access vice versa. In practice read access on tables - # open for writing will mostly work ;-) - - my $lockmgr = LockFile::Simple->make(-autoclean => 1); - - # aquire a write lock. We might hold one acquired in create() already - $self->{write_lock} ||= $lockmgr->lock($self->{file} . '/write') - or die "Can't lock '$self->{file}/write'"; - - my $lockdir = $self->{file} . '/read'; - unless (-d $lockdir) { - mkdir $lockdir, 0755 or die "Could not mkdir $lockdir: $!"; - } + # CONFUSION: WAIT knows two *modes*: read-only or read-write. + # BerkeleyDB means file permissions when talking about Mode. + # BerkeleyDB has the "Flags" attribute to specify + # read/write/lock/etc subsystems. + my $flags; if ($self->{mode} & O_RDWR) { - # this is a hack. We do not check for reopening ... - return $self if $self->{write_lock}; - - # If we actually want to write we must check if there are any readers - local *DIR; - opendir DIR, $lockdir or - die "Could not opendir '$lockdir': $!"; - for my $lockfile (grep { -f "$lockdir/$_" } readdir DIR) { - # check if the locks are still valid. - # Since we are protected by a write lock, we could use a pline file. - # But we want to use the stale testing from LockFile::Simple. - if (my $lck = $lockmgr->trylock("$lockdir/$lockfile")) { - warn "Removing stale lockfile '$lockdir/$lockfile'"; - $lck->release; - } else { - $self->{write_lock}->release; - die "Cannot write table '$file' while it's in use"; - } - } - closedir DIR; + $flags = DB_CREATE; # | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_CDB; + warn "Flags on table $path set to 'writing'"; } else { - # this is a hack. We do not check for reopening ... - return $self if $self->{read_lock}; - - # We are a reader. So we release the write lock - my $id = time; - while (-f "$lockdir/$id.lock") { # here assume ".lock" format! - $id++; - } - $self->{read_lock} = $lockmgr->lock("$lockdir/$id"); - $self->{write_lock}->release; - delete $self->{write_lock}; + $flags = DB_RDONLY; + # warn "Flags on table $path set to 'readonly'"; + } + unless (defined $self->{dbh}) { + my $subname = $self->tablename . "/records"; + $self->{dbh} = + tie(%{$self->{db}}, 'BerkeleyDB::Btree', + $self->{env} ? (Env => $self->{env}) : (), + # Filename => $file, + Filename => $self->maindbfile, + Subname => $subname, + Mode => 0664, + Flags => $flags, + $WAIT::Database::Cachesize?(Cachesize => $WAIT::Database::Cachesize):(), + $WAIT::Database::Pagesize?(Pagesize => $WAIT::Database::Pagesize):(), + ) + or die "Cannot tie: $BerkeleyDB::Error; + DEBUG: Filename[$self->{maindbfile}]subname[$subname]Mode[0664]Flags[$flags]"; } - $self; } @@ -512,11 +508,7 @@ unless ($gotkey) { $key = $self->{nextk}++; } - if ($USE_RECNO) { - $self->{db}->[$key] = $tuple; - } else { - $self->{db}->{$key} = $tuple; - } + $self->{db}->{$key} = $tuple; for (values %{$self->{indexes}}) { unless ($_->insert($key, %parm)) { # duplicate key, undo changes @@ -567,11 +559,7 @@ return () if exists $self->{deleted}->{$key}; defined $self->{db} or $self->open; - if ($USE_RECNO) { - $self->unpack($self->{db}->[$key]); - } else { - $self->unpack($self->{db}->{$key}); - } + $self->unpack($self->{db}->{$key}); } sub delete_by_key { @@ -631,11 +619,10 @@ sub set { my ($self, $iattr, $value) = @_; - - unless ($self->{write_lock}){ - warn "Cannot set iattr[$iattr] without write lock. Nothing done"; - return; - } + # in the rare case that they haven't written a single record yet, we + # make sure, the inverted inherits our $self->{mode}: + defined $self->{db} or $self->open; + for my $att (keys %{$self->{inverted}}) { if ($] > 5.003) { # avoid bug in perl up to 5.003_05 my $idx; @@ -653,6 +640,8 @@ sub close { my $self = shift; + require Carp; Carp::cluck("------->Closing A Table<-------"); + if (exists $self->{'access'}) { eval {$self->{'access'}->close}; # dont bother if not opened } @@ -679,43 +668,23 @@ } if ($self->{dbh}) { delete $self->{dbh}; - - if ($USE_RECNO) { - untie @{$self->{db}}; - } else { - untie %{$self->{db}}; - } - delete $self->{db}; + } + untie %{$self->{db}}; + for my $att (qw(env db path maindbfile)) { + delete $self->{$att}; + warn "----->Deleted att $att<-----"; } - $self->unlock; - 1; } -sub unlock { +sub DESTROY { my $self = shift; - # Either we have a read or a write lock (or we close the table already) - # unless ($self->{read_lock} || $self->{write_lock}) { - # warn "WAIT::Table::unlock: Table aparently hold's no lock" - # } - if ($self->{write_lock}) { - $self->{write_lock}->release(); - delete $self->{write_lock}; - } - if ($self->{read_lock}) { - $self->{read_lock}->release(); - delete $self->{read_lock}; - } + delete $self->{env}; -} + # require Data::Dumper; print STDERR "Line " . __LINE__ . ", File: " . __FILE__ . "\n" . Data::Dumper->new([$self],[qw(self)])->Indent(1)->Useqq(1)->Dump; # XXX -sub DESTROY { - my $self = shift; - - warn "Table handle destroyed without closing it first" - if $self->{write_lock} || $self->{read_lock}; } sub open_scan { @@ -772,12 +741,13 @@ bless \%result, 'WAIT::Query::Raw'; } -sub search { +sub search_ref { my $self = shift; my ($query, $attr, $cont, $raw); if (ref $_[0]) { $query = shift; - + # require Data::Dumper; print STDERR "Line " . __LINE__ . ", File: " . __FILE__ . "\n" . Data::Dumper->new([$query],[qw()])->Indent(1)->Useqq(1)->Dump; # XXX + $attr = $query->{attr}; $cont = $query->{cont}; $raw = $query->{raw}; @@ -817,9 +787,9 @@ } if (defined $cont and $cont ne '') { for (@{$self->{inverted}->{$attr}}) { - my %r = $_->search($query, $cont); + my $r = $_->search_ref($query, $cont); my ($key, $val); - while (($key, $val) = each %r) { + while (($key, $val) = each %$r) { if (exists $result{$key}) { $result{$key} += $val; } else { @@ -833,7 +803,17 @@ for (keys %result) { delete $result{$_} if $self->{deleted}->{$_} } - %result; + \%result; +} + +sub parse_query { + my($self, $attr, $query) = @_; + return unless defined $query && length $query; + my %qt; + for (@{$self->{inverted}->{$attr}}) { + grep $qt{$_}++, $_->parse($query); + } + [keys %qt]; } sub hilight_positions {