/* * integrate Hyper Estraier into PostgreSQL * * Dobrica Pavlinusic 2005-05-19 * * TODO: * - all * * NOTES: * - clear structures with memset to support hash indexes (who whould like * to create hash index on table returned from function?) * - number of returned rows is set by PostgreSQL evaluator, see: * http://archives.postgresql.org/pgsql-hackers/2005-02/msg00546.php * * Based on: * - C example from PostgreSQL documentation (BSD licence) * - coreexample002.c and nodeexample002.c from Hyper Estraier (GPL) * - _textin/_textout from pgcurl.c (LGPL) * * This code is licenced under GPL */ #include "postgres.h" #include "fmgr.h" #include "funcapi.h" #include "utils/builtins.h" #include "utils/array.h" #include "utils/lsyscache.h" #include "miscadmin.h" #include "commands/trigger.h" #include "executor/spi.h" #include #include #include #define _textin(str) DirectFunctionCall1(textin, CStringGetDatum(str)) #define _textout(str) DatumGetPointer(DirectFunctionCall1(textout, PointerGetDatum(str))) #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp))) #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp))) /* SortMem got renamed in PostgreSQL 8.0 */ #ifndef SortMem #define SortMem 16 * 1024 #endif #ifdef PG_MODULE_MAGIC PG_MODULE_MAGIC; #endif #define ATTR_DELIMITER "{{!}}" #define HINTS_PREFIX "HINTS." /* prototype */ char *attr2text(ESTDOC *doc, char *attr); char *node_attr2text(ESTRESDOC *rdoc, char *attr); void cond_add_attr(ESTCOND *cond, char *attr); /* work in progress */ PG_FUNCTION_INFO_V1(pgest_attr); Datum pgest_attr(PG_FUNCTION_ARGS) { ArrayType *attr_arr = PG_GETARG_ARRAYTYPE_P(6); Oid attr_element_type = ARR_ELEMTYPE(attr_arr); int attr_ndims = ARR_NDIM(attr_arr); int *attr_dim_counts = ARR_DIMS(attr_arr); int *attr_dim_lower_bounds = ARR_LBOUND(attr_arr); int ncols = 0; int nrows = 0; int indx[MAXDIM]; int16 attr_len; bool attr_byval; char attr_align; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; AttInMetadata *attinmeta; TupleDesc tupdesc; Tuplestorestate *tupstore = NULL; HeapTuple tuple; MemoryContext per_query_ctx; MemoryContext oldcontext; Datum dvalue; char **values; int rsinfo_ncols; int i, j; /* estvars */ ESTDB *db; ESTCOND *cond; ESTDOC *doc; int ecode, *est_result, resnum; int limit = 0; int offset = 0; char *index_path; char *query; char *attr; char *order; /* only allow 1D input array */ if (attr_ndims == 1) { ncols = attr_dim_counts[0]; } else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid input array"), errdetail("Input array must have 1 dimension"))); /* check to see if caller supports us returning a tuplestore */ if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("materialize mode required, but it is not " \ "allowed in this context"))); /* get info about element type needed to construct the array */ get_typlenbyvalalign(attr_element_type, &attr_len, &attr_byval, &attr_align); /* get the requested return tuple description */ tupdesc = rsinfo->expectedDesc; rsinfo_ncols = tupdesc->natts; /* * The requested tuple description better match up with the array * we were given. */ if (rsinfo_ncols != ncols) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid input array"), errdetail("Number of elements in array must match number of query specified columns."))); /* OK, use it */ attinmeta = TupleDescGetAttInMetadata(tupdesc); /* Now go to work */ rsinfo->returnMode = SFRM_Materialize; per_query_ctx = fcinfo->flinfo->fn_mcxt; oldcontext = MemoryContextSwitchTo(per_query_ctx); /* initialize our tuplestore */ tupstore = tuplestore_begin_heap(true, false, SortMem); /* take rest of arguments from function */ /* index path */ if (PG_ARGISNULL(0)) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("index path can't be null"), errdetail("Index path must be valid full path to HyperEstraier index"))); } index_path = _textout(PG_GETARG_TEXT_P(0)); /* query string */ if (PG_ARGISNULL(1)) { query = ""; } else { query = _textout(PG_GETARG_TEXT_P(1)); } /* atribute filter */ if (PG_ARGISNULL(2)) { attr = ""; } else { attr = _textout(PG_GETARG_TEXT_P(2)); } /* sort order */ if (PG_ARGISNULL(3)) { order = ""; } else { order = _textout(PG_GETARG_TEXT_P(3)); } /* limit */ if (PG_ARGISNULL(4)) { limit = 0; } else { limit = PG_GETARG_INT32(4); } /* offset */ if (PG_ARGISNULL(5)) { offset = 0; } else { offset = PG_GETARG_INT32(5); } /* open the database */ elog(DEBUG1, "pgest_attr: est_db_open(%s)", index_path); if(!(db = est_db_open(index_path, ESTDBREADER, &ecode))){ ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("est_db_open: can't open %s: %d", index_path, ecode), errdetail(est_err_msg(ecode)))); } elog(DEBUG1, "pgest_attr: query[%s] attr[%s] limit %d offset %d", query, (PG_ARGISNULL(2) ? "NULL" : attr), limit, offset); /* create a search condition object */ if (!(cond = est_cond_new())) { ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("pgest_attr: est_cond_new failed"))); } /* set the search phrase to the search condition object */ if (! PG_ARGISNULL(1) && strlen(query) > 0) est_cond_set_phrase(cond, query); /* minimum valid attribute length is 10: @a STREQ a */ if (! PG_ARGISNULL(2) && strlen(attr) >= 10) { elog(DEBUG1,"attributes: %s", attr); cond_add_attr(cond, attr); } /* set the search phrase to the search condition object */ if (! PG_ARGISNULL(3) && strlen(order) > 0) { elog(DEBUG1,"est_cond_set_order(%s)", order); est_cond_set_order(cond, order); } if (limit) { elog(DEBUG1,"est_cond_set_max(%d)", limit + offset); est_cond_set_max(cond, limit + offset); } /* get the result of search */ est_result = est_db_search(db, cond, &resnum, NULL); /* check if results exists */ if ( 0 == resnum ) { elog(INFO, "pgest_attr: no results for: %s", query ); } /* total number of tuples to be returned */ if (limit && limit < resnum) { nrows = limit; } else { nrows = resnum - offset; } elog(DEBUG1, "pgest_attr: found %d hits for %s", resnum, query); values = (char **) palloc(ncols * sizeof(char *)); for (i = 0; i < nrows; i++) { /* get result from estraier */ if (! ( doc = est_db_get_doc(db, est_result[i + offset], 0)) ) { elog(INFO, "pgest_attr: can't find result %d", i + offset); } else { elog(DEBUG1, "URI: %s\n Title: %s\n", est_doc_attr(doc, "@uri"), est_doc_attr(doc, "@title") ); } /* iterate over results */ for (j = 0; j < ncols; j++) { bool isnull; /* array value of this position */ indx[0] = j + attr_dim_lower_bounds[0]; dvalue = array_ref(attr_arr, attr_ndims, indx, -1, attr_len, attr_byval, attr_align, &isnull); if (!isnull && doc) values[j] = DatumGetCString( attr2text(doc, (char *)DirectFunctionCall1(textout, dvalue) )); else values[j] = NULL; } /* construct the tuple */ tuple = BuildTupleFromCStrings(attinmeta, values); /* now store it */ tuplestore_puttuple(tupstore, tuple); /* delete estraier document object */ if (doc) est_doc_delete(doc); } tuplestore_donestoring(tupstore); rsinfo->setResult = tupstore; /* * SFRM_Materialize mode expects us to return a NULL Datum. The actual * tuples are in our tuplestore and passed back through * rsinfo->setResult. rsinfo->setDesc is set to the tuple description * that we actually used to build our tuples with, so the caller can * verify we did what it was expecting. */ rsinfo->setDesc = tupdesc; MemoryContextSwitchTo(oldcontext); est_cond_delete(cond); if(!est_db_close(db, &ecode)){ ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("est_db_close: %d", ecode), errdetail(est_err_msg(ecode)))); } return (Datum) 0; } /* make text var from attr */ char *attr2text(ESTDOC *doc, char *attr) { char *val; const char *attrval; int len; int attrlen; if (! doc) return (Datum) NULL; elog(DEBUG1, "doc: %p, attr: %s", doc, attr); if ( (attrval = est_doc_attr(doc, attr)) && (attrlen = strlen(attrval)) ) { val = (char *) palloc(attrlen * sizeof(char)); } else { return (Datum) NULL; } len = strlen(attrval); elog(DEBUG1, "attr2text(%s) = '%s' %d bytes", attr, attrval, len); len++; len *= sizeof(char); elog(DEBUG2, "palloc(%d)", len); val = palloc(len); memset(val, 0, len); strncpy(val, attrval, len); elog(DEBUG2, "val=%s", val); return val; } /* * variation on theme: use node API which doesn't open index on * every query which is much faster for large indexes * */ /* select * from pgest( */ #define _arg_node_uri 0 #define _arg_login 1 #define _arg_passwd 2 #define _arg_depth 3 #define _arg_query 4 #define _arg_attr 5 #define _arg_order 6 #define _arg_limit 7 #define _arg_offset 8 #define _arg_attr_array 9 /* as (foo text, ... ); */ PG_FUNCTION_INFO_V1(pgest_node); Datum pgest_node(PG_FUNCTION_ARGS) { ArrayType *attr_arr = PG_GETARG_ARRAYTYPE_P(_arg_attr_array); Oid attr_element_type = ARR_ELEMTYPE(attr_arr); int attr_ndims = ARR_NDIM(attr_arr); int *attr_dim_counts = ARR_DIMS(attr_arr); int *attr_dim_lower_bounds = ARR_LBOUND(attr_arr); int ncols = 0; int nrows = 0; int indx[MAXDIM]; int16 attr_len; bool attr_byval; char attr_align; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; AttInMetadata *attinmeta; TupleDesc tupdesc; Tuplestorestate *tupstore = NULL; HeapTuple tuple; MemoryContext per_query_ctx; MemoryContext oldcontext; Datum dvalue; char **values; int rsinfo_ncols; int i, j; /* estvars */ ESTNODE *node; ESTCOND *cond; ESTNODERES *nres; ESTRESDOC *rdoc; CBMAP *hints; int resnum = 0; int limit = 0; int offset = 0; int depth = 0; char *node_url; char *user, *passwd; char *query; char *attr; char *order; /* only allow 1D input array */ if (attr_ndims == 1) { ncols = attr_dim_counts[0]; } else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid input array"), errdetail("Input array must have 1 dimension"))); /* check to see if caller supports us returning a tuplestore */ if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("materialize mode required, but it is not " \ "allowed in this context"))); /* get info about element type needed to construct the array */ get_typlenbyvalalign(attr_element_type, &attr_len, &attr_byval, &attr_align); /* get the requested return tuple description */ tupdesc = rsinfo->expectedDesc; rsinfo_ncols = tupdesc->natts; /* * The requested tuple description better match up with the array * we were given. */ if (rsinfo_ncols != ncols) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid input array"), errdetail("Number of elements in array must match number of query specified columns."))); /* OK, use it */ attinmeta = TupleDescGetAttInMetadata(tupdesc); /* Now go to work */ rsinfo->returnMode = SFRM_Materialize; per_query_ctx = fcinfo->flinfo->fn_mcxt; oldcontext = MemoryContextSwitchTo(per_query_ctx); /* initialize our tuplestore */ tupstore = tuplestore_begin_heap(true, false, SortMem); /* take rest of arguments from function */ /* node URL */ if (PG_ARGISNULL(_arg_node_uri)) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("node URL can't be null"), errdetail("Node URL must be valid URL to HyperEstraier node"))); } node_url = _textout(PG_GETARG_TEXT_P(_arg_node_uri)); /* login and password */ if (PG_ARGISNULL(_arg_login) || PG_ARGISNULL(_arg_passwd)) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("username and password can't be NULL"), errdetail("You must specify valid username and password to HyperEstraier node"))); } user = _textout(PG_GETARG_TEXT_P(_arg_login)); passwd = _textout(PG_GETARG_TEXT_P(_arg_passwd)); /* depth of search */ if (PG_ARGISNULL(_arg_depth)) { depth = 0; } else { depth = PG_GETARG_INT32(_arg_depth); } /* query string */ if (PG_ARGISNULL(_arg_query)) { query = ""; } else { query = _textout(PG_GETARG_TEXT_P(_arg_query)); } /* atribute filter */ if (PG_ARGISNULL(_arg_attr)) { attr = ""; } else { attr = _textout(PG_GETARG_TEXT_P(_arg_attr)); } /* sort order */ if (PG_ARGISNULL(_arg_order)) { order = ""; } else { order = _textout(PG_GETARG_TEXT_P(_arg_order)); } /* limit */ if (PG_ARGISNULL(_arg_limit)) { limit = 0; } else { limit = PG_GETARG_INT32(_arg_limit); } /* offset */ if (PG_ARGISNULL(_arg_offset)) { offset = 0; } else { offset = PG_GETARG_INT32(_arg_offset); } /* initialize the network environment */ if(!est_init_net_env()){ ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("pgest_node: can't create network enviroment"))); } /* create the node connection object */ elog(DEBUG1, "pgest_node: est_node_new(%s) as %s", node_url, user); node = est_node_new(node_url); est_node_set_auth(node, user, passwd); elog(DEBUG1, "pgest_node: node: %s (d:%d) query[%s] attr[%s] limit %d offset %d", node_url, depth, query, (PG_ARGISNULL(_arg_attr) ? "NULL" : attr), limit, offset); /* create a search condition object */ if (!(cond = est_cond_new())) { ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("pgest_node: est_cond_new failed"))); } /* set the search phrase to the search condition object */ if (! PG_ARGISNULL(_arg_query) && strlen(query) > 0) est_cond_set_phrase(cond, query); /* minimum valid attribute length is 10: @a STREQ a */ if (! PG_ARGISNULL(_arg_attr) && strlen(attr) >= 10) { elog(DEBUG1,"attributes: %s", attr); cond_add_attr(cond, attr); } /* set the search phrase to the search condition object */ if (! PG_ARGISNULL(_arg_order) && strlen(order) > 0) { elog(DEBUG1,"est_cond_set_order(%s)", order); est_cond_set_order(cond, order); } if (limit) { elog(DEBUG1,"est_cond_set_max(%d)", limit + offset); est_cond_set_max(cond, limit + offset); } if (offset) { elog(DEBUG1,"est_cond_set_skip(%d)", offset); est_cond_set_skip(cond, offset); } /* get the result of search */ nres = est_node_search(node, cond, depth); if (! nres) { int status = est_node_status(node); est_cond_delete(cond); est_node_delete(node); est_free_net_env(); ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("pgest_node: search failed, node status %d", status))); } /* get number of results */ resnum = est_noderes_doc_num(nres); /* check if results exists */ if ( 0 == resnum ) { elog(INFO, "pgest_node: no results for: %s", query ); } /* total number of tuples to be returned */ if (limit && limit < resnum) { nrows = limit; } else { nrows = resnum; } /* get hints */ hints = est_noderes_hints(nres); elog(DEBUG1, "pgest_node: found %d hits for %s", resnum, query); values = (char **) palloc(ncols * sizeof(char *)); for (i = 0; i < nrows; i++) { /* get result from estraier */ if (! ( rdoc = est_noderes_get_doc(nres, i) )) { elog(INFO, "pgest_node: can't find result %d", i + offset); } else { elog(DEBUG1, "URI: %s\n Title: %s\n", est_resdoc_attr(rdoc, "@uri"), est_resdoc_attr(rdoc, "@title") ); } /* iterate over results */ for (j = 0; j < ncols; j++) { bool isnull; char *attr; /* current attribute name */ char *hint; /* position of current hint in attribute */ char *hint_val; /* array value of this position */ indx[0] = j + attr_dim_lower_bounds[0]; dvalue = array_ref(attr_arr, attr_ndims, indx, -1, attr_len, attr_byval, attr_align, &isnull); attr = (char *)DirectFunctionCall1(textout, dvalue); if (!isnull && (hint = strstr(attr, HINTS_PREFIX)) != NULL) { /* skip HINTS. prefix */ hint += strlen(HINTS_PREFIX); hint_val = (char *)cbmapget(hints, hint, -1, NULL); elog(DEBUG2, "hint %s = %s", hint, hint_val); if (hint_val != NULL) { values[j] = DatumGetCString( hint_val ); } else { elog(INFO, "can't get hint in results: %s", hint); values[j] = NULL; } } else if (!isnull && rdoc) values[j] = DatumGetCString( node_attr2text(rdoc, attr) ); else values[j] = NULL; } /* construct the tuple */ tuple = BuildTupleFromCStrings(attinmeta, values); /* now store it */ tuplestore_puttuple(tupstore, tuple); } tuplestore_donestoring(tupstore); rsinfo->setResult = tupstore; /* * SFRM_Materialize mode expects us to return a NULL Datum. The actual * tuples are in our tuplestore and passed back through * rsinfo->setResult. rsinfo->setDesc is set to the tuple description * that we actually used to build our tuples with, so the caller can * verify we did what it was expecting. */ rsinfo->setDesc = tupdesc; MemoryContextSwitchTo(oldcontext); /* delete the node result object */ est_noderes_delete(nres); /* destroy the search condition object */ est_cond_delete(cond); /* destroy the node object */ est_node_delete(node); /* free the networking environment */ est_free_net_env(); return (Datum) 0; } /* make text var from node attr */ char *node_attr2text(ESTRESDOC *rdoc, char *attr) { char *val; const char *attrval; int len; int attrlen; if (! rdoc) return (Datum) NULL; elog(DEBUG1, "doc: %p, attr: %s", rdoc, attr); if ( (attrval = est_resdoc_attr(rdoc, attr)) && (attrlen = strlen(attrval)) ) { val = (char *) palloc(attrlen * sizeof(char)); } else { return (Datum) NULL; } len = strlen(attrval); elog(DEBUG1, "node_attr2text(%s) = '%s' %d bytes", attr, attrval, len); len++; len *= sizeof(char); elog(DEBUG2, "palloc(%d)", len); val = palloc(len); memset(val, 0, len); strncpy(val, attrval, len); elog(DEBUG2, "val=%s", val); return val; } /* parse attributes and add them to confition */ void cond_add_attr(ESTCOND *cond, char *attr) { char *next; char *curr_attr; while ( strlen(attr) > 0 ) { printf("len [%s] = %zd\n", attr, strlen(attr)); if ((next = strstr(attr, ATTR_DELIMITER)) != NULL) { curr_attr = palloc( next - attr + 1 ); memcpy(curr_attr, attr, next-attr); curr_attr[next-attr] = '\0'; next += strlen(ATTR_DELIMITER); } else { next = ""; curr_attr = attr; } elog(DEBUG1, "est_cond_add_attr(%s)", curr_attr); est_cond_add_attr(cond, curr_attr); attr = next; } } /* trigger to keep data in Hyper Estraier index up-to-date */ /* CREATE FUNCTION pgest_trigger() RETURNS TRIGGER AS ... */ /* * UPDATE, INSERT and DELETE triggers are like this: CREATE TRIGGER pgest_trigger_update AFTER UPDATE ON table_name FOR EACH ROW EXECUTE PROCEDURE pgest_trigger('http://localhost:1978/node/trivia','admin','admin', 'name_of_pk', 'column', 'another_column', 'and_so_on' ) */ PG_FUNCTION_INFO_V1(pgest_trigger); Datum pgest_trigger(PG_FUNCTION_ARGS) { TriggerData *data; TupleDesc tupdesc; HeapTuple ret; char **args; char *keycol = NULL; char *key = NULL; char *col_data = NULL; int knumber; int i; int create_doc = 0; int edit_doc = 0; ESTNODE *node; ESTDOC *doc; if (! CALLED_AS_TRIGGER(fcinfo)) { elog(ERROR, "pgest_trigger() must be called as a trigger"); } data = (TriggerData *) fcinfo->context; if (data->tg_trigger->tgnargs < 5) elog(ERROR, "pgest_trigger() requires at least 5 parameters ('http://localhost:1978/node/trivia', 'user', 'passwd', 'pk_column', 'column', ... )"); args = data->tg_trigger->tgargs; keycol = args[3]; tupdesc = data->tg_relation->rd_att; knumber = SPI_fnumber(tupdesc, keycol); key = SPI_getvalue(data->tg_trigtuple, tupdesc, knumber); /* initialize the network environment */ if( ! est_init_net_env() ) elog(ERROR, "pgest_trigger: network is unavailable\n"); /* create and configure the node connection object */ node = est_node_new( args[0] ); est_node_set_auth(node, args[1], args[2]); if (TRIGGER_FIRED_BY_INSERT(data->tg_event)) { /* There is no old data */ ret = data->tg_trigtuple; create_doc++; } else if (TRIGGER_FIRED_BY_UPDATE(data->tg_event)) { ret = data->tg_newtuple; edit_doc++; } else if (TRIGGER_FIRED_BY_DELETE(data->tg_event)) { /* There is no new data */ ret = data->tg_trigtuple; if (! est_node_out_doc_by_uri(node, key) ) elog(ERROR, "est_node_doc_by_uri(%s): %d\n", key, est_node_status(node)); } else { elog(ERROR, "pgest_trigger() not called from INSERT/UPDATE/DELETE"); } if ( create_doc || edit_doc ) { if ( create_doc ) { /* create a document object */ doc = est_doc_new(); est_doc_add_attr(doc, "@uri", key); elog(DEBUG1, "est_doc_new @uri=%s", key); } else { /* edit existing document */ doc = est_node_get_doc_by_uri(node, key); if (doc == NULL) elog(ERROR, "est_node_get_doc_by_uri(%s): %d\n", key, est_node_status(node)); elog(DEBUG1, "est_node_get_doc_by_uri(%s)", key); } for( i = 4; i < data->tg_trigger->tgnargs; i++ ) { col_data = SPI_getvalue(ret, tupdesc, SPI_fnumber(tupdesc, args[i])); if (data) { elog(DEBUG1, " + %s = %s", args[i], col_data); est_doc_add_attr(doc, args[i], col_data); est_doc_add_text(doc, col_data); } } if ( edit_doc ) { /* update existing document */ if( ! est_node_edit_doc(node, doc) ) elog(ERROR, "est_node_edit_doc: %d\n", est_node_status(node)); } else { /* register the document object to the node */ if( ! est_node_put_doc(node, doc) ) elog(ERROR, "est_node_put_doc: %d\n", est_node_status(node)); } /* destroy the document object */ est_doc_delete(doc); } /* destroy the node object */ est_node_delete(node); /* free the networking environment */ est_free_net_env(); return PointerGetDatum(ret); }