Revision 85 (by dpavlin, 2007/02/03 17:13:07) add PG_MODULE_MAGIC for PostgreSQL 8.2
/*
 * integrate Hyper Estraier into PostgreSQL
 *
 * Dobrica Pavlinusic <dpavlin@rot13.org> 2005-05-19
 *
 * TODO:
 * - all
 *
 * NOTES:
 * - clear structures with memset to support hash indexes (who whould like
 *   to create hash index on table returned from function?)
 * - number of returned rows is set by PostgreSQL evaluator, see:
 *   http://archives.postgresql.org/pgsql-hackers/2005-02/msg00546.php
 *
 * Based on:
 * - C example from PostgreSQL documentation (BSD licence)
 * - coreexample002.c and nodeexample002.c from Hyper Estraier (GPL)
 * - _textin/_textout from pgcurl.c (LGPL)
 *
 * This code is licenced under GPL
 */

#include "postgres.h"
#include "fmgr.h"
#include "funcapi.h"
#include "utils/builtins.h"
#include "utils/array.h"
#include "utils/lsyscache.h"
#include "miscadmin.h"
#include "commands/trigger.h"
#include "executor/spi.h"

#include <estraier.h>
#include <cabin.h>
#include <estnode.h>

#define _textin(str) DirectFunctionCall1(textin, CStringGetDatum(str))
#define _textout(str) DatumGetPointer(DirectFunctionCall1(textout, PointerGetDatum(str)))
#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
#define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))

/* SortMem got renamed in PostgreSQL 8.0 */
#ifndef SortMem
 #define SortMem 16 * 1024
#endif

#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif

#define ATTR_DELIMITER "{{!}}"
#define HINTS_PREFIX "HINTS."

/* prototype */
char *attr2text(ESTDOC *doc, char *attr);
char *node_attr2text(ESTRESDOC *rdoc, char *attr);
void cond_add_attr(ESTCOND *cond, char *attr);


/* work in progress */
PG_FUNCTION_INFO_V1(pgest_attr);
Datum pgest_attr(PG_FUNCTION_ARGS)
{
	ArrayType	*attr_arr = PG_GETARG_ARRAYTYPE_P(6);
	Oid		attr_element_type = ARR_ELEMTYPE(attr_arr);
	int		attr_ndims = ARR_NDIM(attr_arr);
	int		*attr_dim_counts = ARR_DIMS(attr_arr);
	int		*attr_dim_lower_bounds = ARR_LBOUND(attr_arr);
	int		ncols = 0;
	int		nrows = 0;
	int		indx[MAXDIM];
	int16		attr_len;
	bool		attr_byval;
	char		attr_align;
	ReturnSetInfo	*rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
	AttInMetadata	*attinmeta;
	TupleDesc	tupdesc;
	Tuplestorestate *tupstore = NULL;
	HeapTuple	tuple;
	MemoryContext	per_query_ctx;
	MemoryContext	oldcontext;
	Datum		dvalue;
	char		**values;
	int		rsinfo_ncols;
	int		i, j;
	/* estvars */
	ESTDB *db;
	ESTCOND *cond;
	ESTDOC *doc;
	int ecode, *est_result, resnum;
	int limit = 0;
	int offset = 0;

	char		*index_path;
	char		*query;
	char		*attr;
	char		*order;


	/* only allow 1D input array */
	if (attr_ndims == 1)
	{
		ncols = attr_dim_counts[0];
	}
	else
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("invalid input array"),
				 errdetail("Input array must have 1 dimension")));
		
	/* check to see if caller supports us returning a tuplestore */
	if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize))
		ereport(ERROR,
				(errcode(ERRCODE_SYNTAX_ERROR),
				 errmsg("materialize mode required, but it is not " \
						"allowed in this context")));

	/* get info about element type needed to construct the array */
	get_typlenbyvalalign(attr_element_type, &attr_len, &attr_byval, &attr_align);

	/* get the requested return tuple description */
	tupdesc = rsinfo->expectedDesc;
	rsinfo_ncols = tupdesc->natts;

	/*
	 * The requested tuple description better match up with the array
	 * we were given.
	 */
	if (rsinfo_ncols != ncols)
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("invalid input array"),
				 errdetail("Number of elements in array must match number of query specified columns.")));

	/* OK, use it */
	attinmeta = TupleDescGetAttInMetadata(tupdesc);

	/* Now go to work */
	rsinfo->returnMode = SFRM_Materialize;

	per_query_ctx = fcinfo->flinfo->fn_mcxt;
	oldcontext = MemoryContextSwitchTo(per_query_ctx);

	/* initialize our tuplestore */
	tupstore = tuplestore_begin_heap(true, false, SortMem);


	/* take rest of arguments from function */

	/* index path */
	if (PG_ARGISNULL(0)) {
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("index path can't be null"),
				 errdetail("Index path must be valid full path to HyperEstraier index")));
	}
	index_path = _textout(PG_GETARG_TEXT_P(0));

	/* query string */
	if (PG_ARGISNULL(1)) {
		query = "";
	} else {
		query = _textout(PG_GETARG_TEXT_P(1));
	}

	/* atribute filter */
	if (PG_ARGISNULL(2)) {
		attr = "";
	} else {
		attr = _textout(PG_GETARG_TEXT_P(2));
	}
	
	/* sort order */
	if (PG_ARGISNULL(3)) {
		order = "";
	} else {
		order = _textout(PG_GETARG_TEXT_P(3));
	}


	/* limit */
	if (PG_ARGISNULL(4)) {
		limit = 0;
	} else {
		limit = PG_GETARG_INT32(4);
	}

	/* offset */
	if (PG_ARGISNULL(5)) {
		offset = 0;
	} else {
		offset = PG_GETARG_INT32(5);
	}


	/* open the database */
	elog(DEBUG1, "pgest_attr: est_db_open(%s)", index_path);
		
	if(!(db = est_db_open(index_path, ESTDBREADER, &ecode))){
		ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
			errmsg("est_db_open: can't open %s: %d", index_path, ecode),
			errdetail(est_err_msg(ecode))));
	}
		
	elog(DEBUG1, "pgest_attr: query[%s] attr[%s] limit %d offset %d", query, (PG_ARGISNULL(2) ? "NULL" : attr), limit, offset);
	
	/* create a search condition object */
	if (!(cond = est_cond_new())) {
		ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED),
			errmsg("pgest_attr: est_cond_new failed")));
	}
	
	/* set the search phrase to the search condition object */
	if (! PG_ARGISNULL(1) && strlen(query) > 0)
		est_cond_set_phrase(cond, query);

	/* minimum valid attribute length is 10: @a STREQ a */
	if (! PG_ARGISNULL(2) && strlen(attr) >= 10) {
		elog(DEBUG1,"attributes: %s", attr);
		cond_add_attr(cond, attr);
	}

	/* set the search phrase to the search condition object */
	if (! PG_ARGISNULL(3) && strlen(order) > 0) {
		elog(DEBUG1,"est_cond_set_order(%s)", order);
		est_cond_set_order(cond, order);
	}

	if (limit) {
		elog(DEBUG1,"est_cond_set_max(%d)", limit + offset);
		est_cond_set_max(cond, limit + offset);
	}

	/* get the result of search */
	est_result = est_db_search(db, cond, &resnum, NULL);

	/* check if results exists */
	if ( 0 == resnum ) {
		elog(INFO, "pgest_attr: no results for: %s", query );
	}

	/* total number of tuples to be returned */
	if (limit && limit < resnum) {
		nrows = limit;
	} else {
		nrows = resnum - offset;
	}


	elog(DEBUG1, "pgest_attr: found %d hits for %s", resnum, query);

	values = (char **) palloc(ncols * sizeof(char *));

	for (i = 0; i < nrows; i++)
	{

		/* get result from estraier */
		if (! ( doc = est_db_get_doc(db, est_result[i + offset], 0)) ) {
			elog(INFO, "pgest_attr: can't find result %d", i + offset);
		} else {
			elog(DEBUG1, "URI: %s\n Title: %s\n",
				est_doc_attr(doc, "@uri"),
				est_doc_attr(doc, "@title")
			);
		}

		/* iterate over results */
		for (j = 0; j < ncols; j++)
		{
			bool	isnull;

			/* array value of this position */
			indx[0] = j + attr_dim_lower_bounds[0];

			dvalue = array_ref(attr_arr, attr_ndims, indx, -1, attr_len, attr_byval, attr_align, &isnull);

			if (!isnull && doc)
				values[j] = DatumGetCString(
					attr2text(doc,
						(char *)DirectFunctionCall1(textout, dvalue)
					));
			else
				values[j] = NULL;
		}
		/* construct the tuple */
		tuple = BuildTupleFromCStrings(attinmeta, values);

		/* now store it */
		tuplestore_puttuple(tupstore, tuple);

		/* delete estraier document object */
		if (doc) est_doc_delete(doc);
	}

	tuplestore_donestoring(tupstore);
	rsinfo->setResult = tupstore;

	/*
	 * SFRM_Materialize mode expects us to return a NULL Datum. The actual
	 * tuples are in our tuplestore and passed back through
	 * rsinfo->setResult. rsinfo->setDesc is set to the tuple description
	 * that we actually used to build our tuples with, so the caller can
	 * verify we did what it was expecting.
	 */
	rsinfo->setDesc = tupdesc;
	MemoryContextSwitchTo(oldcontext);

	est_cond_delete(cond);

	if(!est_db_close(db, &ecode)){
		ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
			errmsg("est_db_close: %d", ecode),
			errdetail(est_err_msg(ecode))));
	}

	return (Datum) 0;
}


/* make text var from attr */
char *attr2text(ESTDOC *doc, char *attr) {
	char *val;
	const char *attrval;
	int len;
	int attrlen;

	if (! doc) return (Datum) NULL;

	elog(DEBUG1, "doc: %p, attr: %s", doc, attr);

	if ( (attrval = est_doc_attr(doc, attr)) && (attrlen = strlen(attrval)) ) {
		val = (char *) palloc(attrlen * sizeof(char));
	} else {
		return (Datum) NULL;
	}

	len = strlen(attrval);
	elog(DEBUG1, "attr2text(%s) = '%s' %d bytes", attr, attrval, len);

	len++;
	len *= sizeof(char);

	elog(DEBUG2, "palloc(%d)", len);

	val = palloc(len);

	memset(val, 0, len);
	strncpy(val, attrval, len);

	elog(DEBUG2, "val=%s", val);

	return val;
}

/*
 * variation on theme: use node API which doesn't open index on
 * every query which is much faster for large indexes
 *
 */

/* select * from pgest( */
#define _arg_node_uri 0
#define _arg_login 1
#define _arg_passwd 2
#define _arg_depth 3
#define _arg_query 4
#define _arg_attr 5
#define _arg_order 6
#define _arg_limit 7
#define _arg_offset 8
#define _arg_attr_array 9
/* as (foo text, ... ); */


PG_FUNCTION_INFO_V1(pgest_node);
Datum pgest_node(PG_FUNCTION_ARGS)
{
	ArrayType	*attr_arr = PG_GETARG_ARRAYTYPE_P(_arg_attr_array);
	Oid		attr_element_type = ARR_ELEMTYPE(attr_arr);
	int		attr_ndims = ARR_NDIM(attr_arr);
	int		*attr_dim_counts = ARR_DIMS(attr_arr);
	int		*attr_dim_lower_bounds = ARR_LBOUND(attr_arr);
	int		ncols = 0;
	int		nrows = 0;
	int		indx[MAXDIM];
	int16		attr_len;
	bool		attr_byval;
	char		attr_align;
	ReturnSetInfo	*rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
	AttInMetadata	*attinmeta;
	TupleDesc	tupdesc;
	Tuplestorestate *tupstore = NULL;
	HeapTuple	tuple;
	MemoryContext	per_query_ctx;
	MemoryContext	oldcontext;
	Datum		dvalue;
	char		**values;
	int		rsinfo_ncols;
	int		i, j;
	/* estvars */
	ESTNODE *node;
	ESTCOND *cond;
	ESTNODERES *nres;
	ESTRESDOC *rdoc;
	CBMAP *hints;
	int resnum = 0;
	int limit = 0;
	int offset = 0;
	int depth = 0;

	char		*node_url;
	char		*user, *passwd;
	char		*query;
	char		*attr;
	char		*order;


	/* only allow 1D input array */
	if (attr_ndims == 1)
	{
		ncols = attr_dim_counts[0];
	}
	else
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("invalid input array"),
				 errdetail("Input array must have 1 dimension")));
		
	/* check to see if caller supports us returning a tuplestore */
	if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize))
		ereport(ERROR,
				(errcode(ERRCODE_SYNTAX_ERROR),
				 errmsg("materialize mode required, but it is not " \
						"allowed in this context")));

	/* get info about element type needed to construct the array */
	get_typlenbyvalalign(attr_element_type, &attr_len, &attr_byval, &attr_align);

	/* get the requested return tuple description */
	tupdesc = rsinfo->expectedDesc;
	rsinfo_ncols = tupdesc->natts;

	/*
	 * The requested tuple description better match up with the array
	 * we were given.
	 */
	if (rsinfo_ncols != ncols)
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("invalid input array"),
				 errdetail("Number of elements in array must match number of query specified columns.")));

	/* OK, use it */
	attinmeta = TupleDescGetAttInMetadata(tupdesc);

	/* Now go to work */
	rsinfo->returnMode = SFRM_Materialize;

	per_query_ctx = fcinfo->flinfo->fn_mcxt;
	oldcontext = MemoryContextSwitchTo(per_query_ctx);

	/* initialize our tuplestore */
	tupstore = tuplestore_begin_heap(true, false, SortMem);


	/* take rest of arguments from function */

	/* node URL */
	if (PG_ARGISNULL(_arg_node_uri)) {
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("node URL can't be null"),
				 errdetail("Node URL must be valid URL to HyperEstraier node")));
	}
	node_url = _textout(PG_GETARG_TEXT_P(_arg_node_uri));

	/* login and password */
	if (PG_ARGISNULL(_arg_login) || PG_ARGISNULL(_arg_passwd)) {
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("username and password can't be NULL"),
				 errdetail("You must specify valid username and password to HyperEstraier node")));
	}
	user = _textout(PG_GETARG_TEXT_P(_arg_login));
	passwd = _textout(PG_GETARG_TEXT_P(_arg_passwd));

	/* depth of search */
	if (PG_ARGISNULL(_arg_depth)) {
		depth = 0;
	} else {
		depth = PG_GETARG_INT32(_arg_depth);
	}

	/* query string */
	if (PG_ARGISNULL(_arg_query)) {
		query = "";
	} else {
		query = _textout(PG_GETARG_TEXT_P(_arg_query));
	}

	/* atribute filter */
	if (PG_ARGISNULL(_arg_attr)) {
		attr = "";
	} else {
		attr = _textout(PG_GETARG_TEXT_P(_arg_attr));
	}
	
	/* sort order */
	if (PG_ARGISNULL(_arg_order)) {
		order = "";
	} else {
		order = _textout(PG_GETARG_TEXT_P(_arg_order));
	}


	/* limit */
	if (PG_ARGISNULL(_arg_limit)) {
		limit = 0;
	} else {
		limit = PG_GETARG_INT32(_arg_limit);
	}

	/* offset */
	if (PG_ARGISNULL(_arg_offset)) {
		offset = 0;
	} else {
		offset = PG_GETARG_INT32(_arg_offset);
	}

	/* initialize the network environment */
	if(!est_init_net_env()){
		ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED),
			errmsg("pgest_node: can't create network enviroment")));
	}

	/* create the node connection object */
	elog(DEBUG1, "pgest_node: est_node_new(%s) as %s", node_url, user);
	node = est_node_new(node_url);
	est_node_set_auth(node, user, passwd);

	elog(DEBUG1, "pgest_node: node: %s (d:%d) query[%s] attr[%s] limit %d offset %d", node_url, depth, query, (PG_ARGISNULL(_arg_attr) ? "NULL" : attr), limit, offset);
	
	/* create a search condition object */
	if (!(cond = est_cond_new())) {
		ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED),
			errmsg("pgest_node: est_cond_new failed")));
	}
	
	/* set the search phrase to the search condition object */
	if (! PG_ARGISNULL(_arg_query) && strlen(query) > 0)
		est_cond_set_phrase(cond, query);

	/* minimum valid attribute length is 10: @a STREQ a */
	if (! PG_ARGISNULL(_arg_attr) && strlen(attr) >= 10) {
		elog(DEBUG1,"attributes: %s", attr);
		cond_add_attr(cond, attr);
	}

	/* set the search phrase to the search condition object */
	if (! PG_ARGISNULL(_arg_order) && strlen(order) > 0) {
		elog(DEBUG1,"est_cond_set_order(%s)", order);
		est_cond_set_order(cond, order);
	}

	if (limit) {
		elog(DEBUG1,"est_cond_set_max(%d)", limit + offset);
		est_cond_set_max(cond, limit + offset);
	}

	if (offset) {
		elog(DEBUG1,"est_cond_set_skip(%d)", offset);
		est_cond_set_skip(cond, offset);
	}

	/* get the result of search */
	nres = est_node_search(node, cond, depth);

	if (! nres) {
		int status = est_node_status(node);
		est_cond_delete(cond);
		est_node_delete(node);
		est_free_net_env();
		ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED),
			errmsg("pgest_node: search failed, node status %d", status)));
	}

	/* get number of results */
	resnum = est_noderes_doc_num(nres);

	/* check if results exists */
	if ( 0 == resnum ) {
		elog(INFO, "pgest_node: no results for: %s", query );
	}

	/* total number of tuples to be returned */
	if (limit && limit < resnum) {
		nrows = limit;
	} else {
		nrows = resnum;
	}

	/* get hints */
	hints = est_noderes_hints(nres);

	elog(DEBUG1, "pgest_node: found %d hits for %s", resnum, query);


	values = (char **) palloc(ncols * sizeof(char *));

	for (i = 0; i < nrows; i++)
	{

		/* get result from estraier */
		if (! ( rdoc = est_noderes_get_doc(nres, i) )) {
			elog(INFO, "pgest_node: can't find result %d", i + offset);
		} else {
			elog(DEBUG1, "URI: %s\n Title: %s\n",
				est_resdoc_attr(rdoc, "@uri"),
				est_resdoc_attr(rdoc, "@title")
			);
		}

		/* iterate over results */
		for (j = 0; j < ncols; j++)
		{
			bool	isnull;
			char	*attr;	/* current attribute name */
			char	*hint;	/* position of current hint in attribute */
			char	*hint_val;

			/* array value of this position */
			indx[0] = j + attr_dim_lower_bounds[0];

			dvalue = array_ref(attr_arr, attr_ndims, indx, -1, attr_len, attr_byval, attr_align, &isnull);
			attr = (char *)DirectFunctionCall1(textout, dvalue);

			if (!isnull && (hint = strstr(attr, HINTS_PREFIX)) != NULL) {
				/* skip HINTS. prefix */
				hint += strlen(HINTS_PREFIX);

				hint_val = (char *)cbmapget(hints, hint, -1, NULL);
				elog(DEBUG2, "hint %s = %s", hint, hint_val);

				if (hint_val != NULL) {
					values[j] = DatumGetCString( hint_val );
				} else {
					elog(INFO, "can't get hint in results: %s", hint);
					values[j] = NULL;
				}
			} else if (!isnull && rdoc)
				values[j] = DatumGetCString( node_attr2text(rdoc, attr) );
			else
				values[j] = NULL;
		}
		/* construct the tuple */
		tuple = BuildTupleFromCStrings(attinmeta, values);

		/* now store it */
		tuplestore_puttuple(tupstore, tuple);

	}

	tuplestore_donestoring(tupstore);
	rsinfo->setResult = tupstore;

	/*
	 * SFRM_Materialize mode expects us to return a NULL Datum. The actual
	 * tuples are in our tuplestore and passed back through
	 * rsinfo->setResult. rsinfo->setDesc is set to the tuple description
	 * that we actually used to build our tuples with, so the caller can
	 * verify we did what it was expecting.
	 */
	rsinfo->setDesc = tupdesc;
	MemoryContextSwitchTo(oldcontext);

	/* delete the node result object */
	est_noderes_delete(nres);

	/* destroy the search condition object */                          
	est_cond_delete(cond);

	/* destroy the node object */
	est_node_delete(node);

	/* free the networking environment */
	est_free_net_env();

	return (Datum) 0;
}

/* make text var from node attr */
char *node_attr2text(ESTRESDOC *rdoc, char *attr) {
	char *val;
	const char *attrval;
	int len;
	int attrlen;

	if (! rdoc) return (Datum) NULL;

	elog(DEBUG1, "doc: %p, attr: %s", rdoc, attr);

	if ( (attrval = est_resdoc_attr(rdoc, attr)) && (attrlen = strlen(attrval)) ) {
		val = (char *) palloc(attrlen * sizeof(char));
	} else {
		return (Datum) NULL;
	}

	len = strlen(attrval);
	elog(DEBUG1, "node_attr2text(%s) = '%s' %d bytes", attr, attrval, len);

	len++;
	len *= sizeof(char);

	elog(DEBUG2, "palloc(%d)", len);

	val = palloc(len);

	memset(val, 0, len);
	strncpy(val, attrval, len);

	elog(DEBUG2, "val=%s", val);

	return val;
}

/* parse attributes and add them to confition */
void cond_add_attr(ESTCOND *cond, char *attr) {
	char *next;
	char *curr_attr;
	while ( strlen(attr) > 0 ) {
		printf("len [%s] = %zd\n", attr, strlen(attr));
		if ((next = strstr(attr, ATTR_DELIMITER)) != NULL) {
			curr_attr = palloc( next - attr + 1 );
			memcpy(curr_attr, attr, next-attr);
			curr_attr[next-attr] = '\0';
			next += strlen(ATTR_DELIMITER);
		} else {
			next = "";
			curr_attr = attr;
		}
		elog(DEBUG1, "est_cond_add_attr(%s)", curr_attr);
		est_cond_add_attr(cond, curr_attr);
		attr = next;
	}
}

/* trigger to keep data in Hyper Estraier index up-to-date */
/* CREATE FUNCTION pgest_trigger() RETURNS TRIGGER AS ... */

/*
 * UPDATE, INSERT and DELETE triggers are like this:

CREATE TRIGGER pgest_trigger_update AFTER UPDATE
	ON table_name FOR EACH ROW
	EXECUTE PROCEDURE pgest_trigger('http://localhost:1978/node/trivia','admin','admin',
		'name_of_pk', 'column', 'another_column', 'and_so_on'
	)

*/

PG_FUNCTION_INFO_V1(pgest_trigger);
Datum pgest_trigger(PG_FUNCTION_ARGS) {

	TriggerData *data;
	TupleDesc   tupdesc;
	HeapTuple   ret;

	char **args;
	char *keycol  = NULL;
	char *key     = NULL;
	char *col_data = NULL;
	int   knumber;
	int   i;
	int   create_doc = 0;
	int   edit_doc = 0;

	ESTNODE *node;
	ESTDOC *doc;



	if (! CALLED_AS_TRIGGER(fcinfo)) {
		elog(ERROR, "pgest_trigger() must be called as a trigger");
	}

	data = (TriggerData *) fcinfo->context;

	if (data->tg_trigger->tgnargs < 5)
		elog(ERROR, "pgest_trigger() requires at least 5 parameters ('http://localhost:1978/node/trivia', 'user', 'passwd', 'pk_column', 'column', ... )");

	args       = data->tg_trigger->tgargs;
	keycol     = args[3];

	tupdesc = data->tg_relation->rd_att;

	knumber = SPI_fnumber(tupdesc, keycol);
	key = SPI_getvalue(data->tg_trigtuple, tupdesc, knumber);


	/* initialize the network environment */
	if( ! est_init_net_env() )
		elog(ERROR, "pgest_trigger: network is unavailable\n");

	/* create and configure the node connection object */
	node = est_node_new( args[0] );
	est_node_set_auth(node, args[1], args[2]);


	if (TRIGGER_FIRED_BY_INSERT(data->tg_event)) {
		/* There is no old data */
		ret = data->tg_trigtuple;

		create_doc++;

	} else if (TRIGGER_FIRED_BY_UPDATE(data->tg_event)) {
		ret = data->tg_newtuple;

		edit_doc++;

	} else if (TRIGGER_FIRED_BY_DELETE(data->tg_event)) {
		/* There is no new data */
		ret = data->tg_trigtuple;

		if (! est_node_out_doc_by_uri(node, key) )
			elog(ERROR, "est_node_doc_by_uri(%s): %d\n", key, est_node_status(node));

	} else {
		elog(ERROR, "pgest_trigger() not called from INSERT/UPDATE/DELETE");
	}

	if ( create_doc || edit_doc ) {

		if ( create_doc ) {
			/* create a document object */
			doc = est_doc_new();
			est_doc_add_attr(doc, "@uri", key);

			elog(DEBUG1, "est_doc_new @uri=%s", key);
		} else {
			/* edit existing document */
			doc = est_node_get_doc_by_uri(node, key);
			if (doc == NULL)
				elog(ERROR, "est_node_get_doc_by_uri(%s): %d\n", key, est_node_status(node));
			elog(DEBUG1, "est_node_get_doc_by_uri(%s)", key);
		}

		for( i = 4; i < data->tg_trigger->tgnargs; i++ ) {

			col_data = SPI_getvalue(ret, tupdesc, SPI_fnumber(tupdesc, args[i]));

			if (data) {
				elog(DEBUG1, " + %s = %s", args[i], col_data);
				est_doc_add_attr(doc, args[i], col_data);
				est_doc_add_text(doc, col_data);
			}

		}

		if ( edit_doc ) {
			/* update existing document */
			if( ! est_node_edit_doc(node, doc) )
				elog(ERROR, "est_node_edit_doc: %d\n", est_node_status(node));
		} else {
			/* register the document object to the node */
			if( ! est_node_put_doc(node, doc) )
				elog(ERROR, "est_node_put_doc: %d\n", est_node_status(node));
		}

		/* destroy the document object */
		est_doc_delete(doc);

	}

	/* destroy the node object */
	est_node_delete(node);
	/* free the networking environment */
	est_free_net_env();


	return PointerGetDatum(ret);
}