diff --git a/contrib/pgsql_fdw/deparse.c b/contrib/pgsql_fdw/deparse.c new file mode 100644 index 148d0b4..9737a0c *** a/contrib/pgsql_fdw/deparse.c --- b/contrib/pgsql_fdw/deparse.c *************** *** 28,33 **** --- 28,34 ---- #include "parser/parsetree.h" #include "utils/builtins.h" #include "utils/lsyscache.h" + #include "utils/rel.h" #include "utils/syscache.h" #include "pgsql_fdw.h" *************** sortConditions(PlannerInfo *root, *** 218,223 **** --- 219,289 ---- } /* + * Deparse SELECT statement to acquire sample rows of given relation into buf. + */ + void + deparseAnalyzeSql(StringInfo buf, Relation rel) + { + Oid relid = RelationGetRelid(rel); + TupleDesc tupdesc = RelationGetDescr(rel); + int i; + char *colname; + List *options; + ListCell *lc; + bool first = true; + char *nspname; + char *relname; + ForeignTable *table; + + /* Deparse SELECT clause, use attribute name or colname option. */ + appendStringInfo(buf, "SELECT "); + for (i = 0; i < tupdesc->natts; i++) + { + if (tupdesc->attrs[i]->attisdropped) + continue; + + colname = NameStr(tupdesc->attrs[i]->attname); + options = GetForeignColumnOptions(relid, tupdesc->attrs[i]->attnum); + + foreach(lc, options) + { + DefElem *def= (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "colname") == 0) + { + colname = defGetString(def); + break; + } + } + + if (!first) + appendStringInfo(buf, ", "); + appendStringInfo(buf, "%s", quote_identifier(colname)); + first = false; + } + + /* + * Deparse FROM clause, use namespace and relation name, or use nspname and + * colname options respectively. + */ + nspname = get_namespace_name(get_rel_namespace(relid)); + relname = get_rel_name(relid); + table = GetForeignTable(relid); + foreach(lc, table->options) + { + DefElem *def= (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "nspname") == 0) + nspname = defGetString(def); + else if (strcmp(def->defname, "relname") == 0) + relname = defGetString(def); + } + + appendStringInfo(buf, " FROM %s.%s", quote_identifier(nspname), + quote_identifier(relname)); + } + + /* * Deparse given expression into buf. Actual string operation is delegated to * node-type-specific functions. * diff --git a/contrib/pgsql_fdw/pgsql_fdw.c b/contrib/pgsql_fdw/pgsql_fdw.c new file mode 100644 index b7885a8..63b2bba *** a/contrib/pgsql_fdw/pgsql_fdw.c --- b/contrib/pgsql_fdw/pgsql_fdw.c *************** *** 17,22 **** --- 17,23 ---- #include "catalog/pg_foreign_table.h" #include "commands/defrem.h" #include "commands/explain.h" + #include "commands/vacuum.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" *************** typedef struct PgsqlFdwExecutionState *** 133,138 **** --- 134,168 ---- } PgsqlFdwExecutionState; /* + * Describes a state of analyze request for a foreign table. + */ + typedef struct PgsqlAnalyzeState + { + /* for tuple generation. */ + TupleDesc tupdesc; + AttInMetadata *attinmeta; + char *colbuf; /* column value buffer for row processor */ + int colbuflen; /* column value buffer size for row processor */ + Datum *values; + bool *nulls; + + /* for random sampling */ + HeapTuple *rows; /* result buffer */ + int targrows; /* target # of sample rows */ + int numrows; /* # of samples collected */ + double samplerows; /* # of rows fetched */ + double rowstoskip; /* # of rows skipped before next sample */ + double rstate; /* random state */ + + /* for storing result tuples */ + MemoryContext anl_cxt; /* context for per-analyze lifespan data */ + MemoryContext temp_cxt; /* context for per-tuple temporary data */ + + /* for error handling. */ + ErrorPos errpos; + } PgsqlAnalyzeState; + + /* * SQL functions */ extern Datum pgsql_fdw_handler(PG_FUNCTION_ARGS); *************** static void pgsqlBeginForeignScan(Foreig *** 158,163 **** --- 188,196 ---- static TupleTableSlot *pgsqlIterateForeignScan(ForeignScanState *node); static void pgsqlReScanForeignScan(ForeignScanState *node); static void pgsqlEndForeignScan(ForeignScanState *node); + static bool pgsqlAnalyzeForeignTable(Relation relation, + AcquireSampleRowsFunc *func, + BlockNumber *totalpages); /* * Helper functions *************** static void execute_query(ForeignScanSta *** 174,179 **** --- 207,218 ---- static int query_row_processor(PGresult *res, const PGdataValue *columns, const char **errmsgp, void *param); static void pgsql_fdw_error_callback(void *arg); + static int pgsqlAcquireSampleRowsFunc(Relation relation, int elevel, + HeapTuple *rows, int targrows, + double *totalrows, + double *totaldeadrows); + static int analyze_row_processor(PGresult *res, const PGdataValue *columns, + const char **errmsgp, void *param); /* Exported functions, but not written in pgsql_fdw.h. */ void _PG_init(void); *************** static FdwRoutine fdwroutine = { *** 215,220 **** --- 254,260 ---- pgsqlEndForeignScan, /* Optional handler functions. */ + pgsqlAnalyzeForeignTable, }; /* *************** query_row_processor(PGresult *res, *** 908,918 **** --- 948,960 ---- festate->nulls[i] = false; + MemoryContextSwitchTo(festate->scan_cxt); while (festate->colbuflen < len + 1) { festate->colbuflen *= 2; festate->colbuf = repalloc(festate->colbuf, festate->colbuflen); } + MemoryContextSwitchTo(festate->temp_cxt); memcpy(festate->colbuf, columns[j].value, len); festate->colbuf[columns[j].len] = '\0'; *************** query_row_processor(PGresult *res, *** 960,966 **** static void pgsql_fdw_error_callback(void *arg) { ! ErrorPos *errpos = (ErrorPos *) arg; const char *relname; const char *colname; --- 1002,1008 ---- static void pgsql_fdw_error_callback(void *arg) { ! ErrorPos *errpos = (ErrorPos *) arg; const char *relname; const char *colname; *************** pgsql_fdw_error_callback(void *arg) *** 969,971 **** --- 1011,1311 ---- errcontext("column %s of foreign table %s", quote_identifier(colname), quote_identifier(relname)); } + + /* + * pgsqlAnalyzeForeignTable + * Test whether analyzing this foreign table is supported + */ + static bool + pgsqlAnalyzeForeignTable(Relation relation, + AcquireSampleRowsFunc *func, + BlockNumber *totalpages) + { + *totalpages = 0; + *func = pgsqlAcquireSampleRowsFunc; + + return true; + } + + /* + * Acquire a random sample of rows from foreign table managed by pgsql_fdw. + * + * pgsql_fdw doesn't provide direct access to remote buffer, so we execute + * simple SELECT statement which retrieves whole rows from remote side, and + * pick some samples from them. + */ + static int + pgsqlAcquireSampleRowsFunc(Relation relation, int elevel, + HeapTuple *rows, int targrows, + double *totalrows, + double *totaldeadrows) + { + PgsqlAnalyzeState astate; + StringInfoData sql; + ForeignTable *table; + ForeignServer *server; + UserMapping *user; + PGconn *conn = NULL; + PGresult *res = NULL; + + /* + * Only few information are necessary as input to row processor. Other + * initialization will be done at the first row processor call. + */ + astate.anl_cxt = CurrentMemoryContext; + astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext, + "pgsql_fdw analyze temporary data", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + astate.rows = rows; + astate.targrows = targrows; + astate.tupdesc = relation->rd_att; + astate.errpos.relid = relation->rd_id; + + /* + * Construct SELECT statement which retrieves whole rows from remote. We + * can't avoid running sequential scan on remote side to get practical + * statistics, so this seems reasonable compromise. + */ + initStringInfo(&sql); + deparseAnalyzeSql(&sql, relation); + + table = GetForeignTable(relation->rd_id); + server = GetForeignServer(table->serverid); + user = GetUserMapping(GetOuterUserId(), server->serverid); + conn = GetConnection(server, user, true); + + /* + * Acquire sample rows from the result set. + */ + PG_TRY(); + { + /* + * Execute remote query and retrieve results with libpq row processor. + */ + PQsetRowProcessor(conn, analyze_row_processor, &astate); + res = PQexec(conn, sql.data); + PQsetRowProcessor(conn, NULL, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + ereport(ERROR, + (errmsg("could not execute remote query for analyze"), + errdetail("%s", PQerrorMessage(conn)), + errhint("%s", sql.data))); + PQclear(res); + res = NULL; + } + PG_CATCH(); + { + PQclear(res); + PG_RE_THROW(); + } + PG_END_TRY(); + + ReleaseConnection(conn); + + /* We assume that we have no dead tuple. */ + *totaldeadrows = 0.0; + + /* We've retrieved all living tuples from foreign server. */ + *totalrows = astate.samplerows; + + /* + * We don't update pg_class.relpages because we don't care that in + * planning at all. + */ + + /* + * Emit some interesting relation info + */ + ereport(elevel, + (errmsg("\"%s\": scanned with \"%s\", " + "containing %.0f live rows and %.0f dead rows; " + "%d rows in sample, %.0f estimated total rows", + RelationGetRelationName(relation), sql.data, + astate.samplerows, 0.0, + astate.numrows, astate.samplerows))); + + return astate.numrows; + } + + /* + * Custom row processor for acquire_sample_rows. + * + * Collect sample rows from the result of query. + * - Use all tuples as sample until target rows samples are collected. + * - Once reached the target, skip some tuples and replace already sampled + * tuple randomly. + */ + static int + analyze_row_processor(PGresult *res, const PGdataValue *columns, + const char **errmsgp, void *param) + { + PgsqlAnalyzeState *astate = (PgsqlAnalyzeState *) param; + int targrows = astate->targrows; + TupleDesc tupdesc = astate->tupdesc; + AttInMetadata *attinmeta = astate->attinmeta; + int i; + int j; + int pos; /* position where next sample should be stored. */ + HeapTuple tuple; + ErrorContextCallback errcontext; + MemoryContext callercontext; + + if (columns == NULL) + { + /* Prepare for sampling rows */ + astate->attinmeta = TupleDescGetAttInMetadata(tupdesc); + astate->colbuflen = INITIAL_COLBUF_SIZE; + astate->colbuf = palloc(astate->colbuflen); + astate->values = (Datum *) palloc(sizeof(Datum) * tupdesc->natts); + astate->nulls = (bool *) palloc(sizeof(bool) * tupdesc->natts); + astate->numrows = 0; + astate->samplerows = 0; + astate->rowstoskip = -1; + astate->numrows = 0; + astate->rstate = anl_init_selection_state(astate->targrows); + + return 1; + } + + /* + * ANALYZE against foreign tables are not done in processes of + * vacuum, so here we use CHECK_FOR_INTERRUPTS instead of + * vacuum_delay_point(). + */ + CHECK_FOR_INTERRUPTS(); + + /* + * Do the following work in a temp context that we reset after each tuple. + * This cleans up not only the data we have direct access to, but any + * cruft the I/O functions might leak. + */ + callercontext = MemoryContextSwitchTo(astate->temp_cxt); + + /* + * First targrows rows are once sampled always. If we have more source + * rows, pick up some of them by skipping and replace already sampled + * tuple randomly. + * + * Here we just determine the slot where next sample should be stored. Set + * pos to negative value to indicates the row should be skipped. + */ + if (astate->numrows < targrows) + pos = astate->numrows++; + else + { + /* + * The first targrows sample rows are simply copied into + * the reservoir. Then we start replacing tuples in the + * sample until we reach the end of the relation. This + * algorithm is from Jeff Vitter's paper, similarly to + * acquire_sample_rows in analyze.c. + * + * We don't have block-wise accessibility, so every row in + * the PGresult is possible to be sample. + */ + if (astate->rowstoskip < 0) + astate->rowstoskip = anl_get_next_S(astate->samplerows, targrows, + &astate->rstate); + + if (astate->rowstoskip <= 0) + { + int k = (int) (targrows * anl_random_fract()); + + Assert(k >= 0 && k < targrows); + + /* + * Create sample tuple from the result, and replace at + * random. + */ + heap_freetuple(astate->rows[k]); + pos = k; + } + else + pos = -1; + + astate->rowstoskip -= 1; + } + + /* Always increment sample row counter. */ + astate->samplerows += 1; + + if (pos >= 0) + { + /* + * Create sample tuple from current result row, and store it into the + * position determined above. Note that i and j point entries in + * catalog and columns array respectively. + */ + for (i = 0, j = 0; i < tupdesc->natts; i++) + { + int len = columns[j].len; + + if (tupdesc->attrs[i]->attisdropped) + continue; + + if (len < 0) + astate->nulls[i] = true; + else + { + Datum value; + + astate->nulls[i] = false; + + /* + * Expand column value string buffer twice as current until + * it can hold the value. We need to allocate the buffer in + * the context of analyze. + */ + while (astate->colbuflen < len + 1) + { + astate->colbuflen *= 2; + MemoryContextSwitchTo(astate->anl_cxt); + astate->colbuf = repalloc(astate->colbuf, + astate->colbuflen); + MemoryContextSwitchTo(astate->temp_cxt); + } + memcpy(astate->colbuf, columns[j].value, len); + astate->colbuf[columns[j].len] = '\0'; + + /* + * Set up and install callback to report where conversion error + * occurs. + */ + astate->errpos.cur_attno = i + 1; + errcontext.callback = pgsql_fdw_error_callback; + errcontext.arg = (void *) &astate->errpos; + errcontext.previous = error_context_stack; + error_context_stack = &errcontext; + + value = InputFunctionCall(&attinmeta->attinfuncs[i], + astate->colbuf, + attinmeta->attioparams[i], + attinmeta->atttypmods[i]); + astate->values[i] = value; + + /* Uninstall error callback function. */ + error_context_stack = errcontext.previous; + } + j++; + } + + /* + * Generate tuple from the result row data, and store it into the give + * buffer. Note that we need to allocate the tuple in the analyze + * context to make it valid even after temporary per-tuple context has + * been reset. + */ + MemoryContextSwitchTo(astate->anl_cxt); + tuple = heap_form_tuple(tupdesc, astate->values, astate->nulls); + MemoryContextSwitchTo(astate->temp_cxt); + astate->rows[pos] = tuple; + } + + /* Clean up */ + MemoryContextSwitchTo(callercontext); + MemoryContextReset(astate->temp_cxt); + + return 1; + } diff --git a/contrib/pgsql_fdw/pgsql_fdw.h b/contrib/pgsql_fdw/pgsql_fdw.h new file mode 100644 index 76c9f72..94e6f07 *** a/contrib/pgsql_fdw/pgsql_fdw.h --- b/contrib/pgsql_fdw/pgsql_fdw.h *************** *** 17,22 **** --- 17,23 ---- #include "postgres.h" #include "foreign/foreign.h" #include "nodes/relation.h" + #include "utils/relcache.h" /* in option.c */ int ExtractConnectionOptions(List *defelems, *************** void sortConditions(PlannerInfo *root, *** 38,42 **** --- 39,44 ---- List **remote_conds, List **param_conds, List **local_conds); + void deparseAnalyzeSql(StringInfo buf, Relation rel); #endif /* PGSQL_FDW_H */