contrib/postgres_fdw/deparse.c | 209 ++++- contrib/postgres_fdw/expected/postgres_fdw.out | 34 +- contrib/postgres_fdw/postgres_fdw.c | 1075 +++++++++++++++++++++--- contrib/postgres_fdw/postgres_fdw.h | 64 ++ doc/src/sgml/postgres-fdw.sgml | 10 + src/backend/foreign/foreign.c | 29 + src/backend/nodes/bitmapset.c | 62 ++ src/backend/optimizer/util/var.c | 39 + src/include/foreign/foreign.h | 4 + src/include/nodes/bitmapset.h | 4 + src/include/optimizer/var.h | 1 + 11 files changed, 1360 insertions(+), 171 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index a2675eb..5af3dd7 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -44,8 +44,10 @@ #include "catalog/pg_proc.h" #include "catalog/pg_type.h" #include "commands/defrem.h" +#include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "optimizer/clauses.h" +#include "optimizer/pathnode.h" #include "optimizer/var.h" #include "parser/parsetree.h" #include "utils/builtins.h" @@ -88,6 +90,7 @@ typedef struct deparse_expr_cxt RelOptInfo *foreignrel; /* the foreign relation we are planning for */ StringInfo buf; /* output buffer to append to */ List **params_list; /* exprs that will become remote Params */ + bool var_qualified; /* columns reference needs to be qualified */ } deparse_expr_cxt; /* @@ -106,6 +109,8 @@ static void deparseTargetList(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, + bool first, + bool qualified, Bitmapset *attrs_used, List **retrieved_attrs); static void deparseReturningList(StringInfo buf, PlannerInfo *root, @@ -113,7 +118,7 @@ static void deparseReturningList(StringInfo buf, PlannerInfo *root, List *returningList, List **retrieved_attrs); static void deparseColumnRef(StringInfo buf, int varno, int varattno, - PlannerInfo *root); + bool var_qualified, PlannerInfo *root); static void deparseRelation(StringInfo buf, Relation rel); static void deparseStringLiteral(StringInfo buf, const char *val); static void deparseExpr(Expr *expr, deparse_expr_cxt *context); @@ -142,6 +147,7 @@ static void deparseArrayExpr(ArrayExpr *node, deparse_expr_cxt *context); void classifyConditions(PlannerInfo *root, RelOptInfo *baserel, + List *restrictinfo_list, List **remote_conds, List **local_conds) { @@ -150,7 +156,7 @@ classifyConditions(PlannerInfo *root, *remote_conds = NIL; *local_conds = NIL; - foreach(lc, baserel->baserestrictinfo) + foreach(lc, restrictinfo_list) { RestrictInfo *ri = (RestrictInfo *) lfirst(lc); @@ -244,7 +250,7 @@ foreign_expr_walker(Node *node, * Param's collation, ie it's not safe for it to have a * non-default collation. */ - if (var->varno == glob_cxt->foreignrel->relid && + if (bms_is_member(var->varno, glob_cxt->foreignrel->relids) && var->varlevelsup == 0) { /* Var belongs to foreign table */ @@ -678,8 +684,8 @@ deparseSelectSql(StringInfo buf, * Construct SELECT list */ appendStringInfoString(buf, "SELECT "); - deparseTargetList(buf, root, baserel->relid, rel, attrs_used, - retrieved_attrs); + deparseTargetList(buf, root, baserel->relid, rel, true, false, + attrs_used, retrieved_attrs); /* * Construct FROM clause @@ -702,12 +708,13 @@ deparseTargetList(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, + bool first, + bool qualified, Bitmapset *attrs_used, List **retrieved_attrs) { TupleDesc tupdesc = RelationGetDescr(rel); bool have_wholerow; - bool first; int i; *retrieved_attrs = NIL; @@ -716,7 +723,6 @@ deparseTargetList(StringInfo buf, have_wholerow = bms_is_member(0 - FirstLowInvalidHeapAttributeNumber, attrs_used); - first = true; for (i = 1; i <= tupdesc->natts; i++) { Form_pg_attribute attr = tupdesc->attrs[i - 1]; @@ -733,7 +739,9 @@ deparseTargetList(StringInfo buf, appendStringInfoString(buf, ", "); first = false; - deparseColumnRef(buf, rtindex, i, root); + if (qualified) + appendStringInfo(buf, "r%d.", rtindex); + deparseColumnRef(buf, rtindex, i, false, root); *retrieved_attrs = lappend_int(*retrieved_attrs, i); } @@ -781,6 +789,8 @@ appendWhereClause(StringInfo buf, RelOptInfo *baserel, List *exprs, bool is_first, + bool is_join_on, + bool qualified, List **params) { deparse_expr_cxt context; @@ -795,6 +805,7 @@ appendWhereClause(StringInfo buf, context.foreignrel = baserel; context.buf = buf; context.params_list = params; + context.var_qualified = qualified; /* Make sure any constants in the exprs are printed portably */ nestlevel = set_transmission_modes(); @@ -805,7 +816,7 @@ appendWhereClause(StringInfo buf, /* Connect expressions with "AND" and parenthesize each condition. */ if (is_first) - appendStringInfoString(buf, " WHERE "); + appendStringInfoString(buf, !is_join_on ? " WHERE " : " ON "); else appendStringInfoString(buf, " AND "); @@ -852,7 +863,7 @@ deparseInsertSql(StringInfo buf, PlannerInfo *root, appendStringInfoString(buf, ", "); first = false; - deparseColumnRef(buf, rtindex, attnum, root); + deparseColumnRef(buf, rtindex, attnum, false, root); } appendStringInfoString(buf, ") VALUES ("); @@ -912,7 +923,7 @@ deparseUpdateSql(StringInfo buf, PlannerInfo *root, appendStringInfoString(buf, ", "); first = false; - deparseColumnRef(buf, rtindex, attnum, root); + deparseColumnRef(buf, rtindex, attnum, false, root); appendStringInfo(buf, " = $%d", pindex); pindex++; } @@ -968,8 +979,165 @@ deparseReturningList(StringInfo buf, PlannerInfo *root, &attrs_used); appendStringInfoString(buf, " RETURNING "); - deparseTargetList(buf, root, rtindex, rel, attrs_used, - retrieved_attrs); + deparseTargetList(buf, root, rtindex, rel, true, false, + attrs_used, retrieved_attrs); +} + +/* + * deparseRemoteJoinRelation + * + * The main job portion of deparseRemoteJoinSql. It deparses a relation, + * might be join not only regular table, to SQL expression. + */ +static void +deparseRemoteJoinRelation(StringInfo tlist_buf, + StringInfo from_buf, + StringInfo where_buf, + PlannerInfo *root, Node *relinfo, + List *target_list, List *local_conds, + List **select_vars, List **select_params) +{ + /* + * 'relinfo' is either List or Integer. + * In case of List, it is a packed PgRemoteJoinInfo that contains + * outer and inner join references, so needs to deparse recursively. + * In case of Integer, it is rtindex of a particular foreign table. + */ + if (IsA(relinfo, List)) + { + PgRemoteJoinInfo jinfo; + + unpackPgRemoteJoinInfo(&jinfo, (List *)relinfo); + + appendStringInfoChar(from_buf, '('); + deparseRemoteJoinRelation(tlist_buf, from_buf, where_buf, + root, jinfo.outer_rel, + target_list, local_conds, + select_vars, select_params); + switch (jinfo.jointype) + { + case JOIN_INNER: + appendStringInfoString(from_buf, " JOIN "); + break; + case JOIN_LEFT: + appendStringInfoString(from_buf, " LEFT JOIN "); + break; + case JOIN_FULL: + appendStringInfoString(from_buf, " FULL JOIN "); + break; + case JOIN_RIGHT: + appendStringInfoString(from_buf, " RIGHT JOIN "); + break; + default: + elog(ERROR, "unexpected join type: %d", (int)jinfo.jointype); + break; + } + deparseRemoteJoinRelation(tlist_buf, from_buf, where_buf, + root, jinfo.inner_rel, + target_list, local_conds, + select_vars, select_params); + if (jinfo.remote_conds) + { + RelOptInfo *joinrel = find_join_rel(root, jinfo.relids); + appendWhereClause(from_buf, root, joinrel, + jinfo.remote_conds, + true, true, true, select_params); + } + else + { + /* prevent syntax error */ + appendStringInfoString(from_buf, " ON true"); + } + appendStringInfoChar(from_buf, ')'); + } + else if (IsA(relinfo, Integer)) + { + Index rtindex = intVal(relinfo); + RangeTblEntry *rte = planner_rt_fetch(rtindex, root); + RelOptInfo *baserel = root->simple_rel_array[rtindex]; + Relation rel; + TupleDesc tupdesc; + Bitmapset *attrs_used = NULL; + List *retrieved_attrs = NIL; + ListCell *lc; + PgFdwRelationInfo *fpinfo; + + rel = heap_open(rte->relid, NoLock); + deparseRelation(from_buf, rel); + appendStringInfo(from_buf, " r%d", rtindex); + + pull_varattnos((Node *) target_list, rtindex, &attrs_used); + pull_varattnos((Node *) local_conds, rtindex, &attrs_used); + deparseTargetList(tlist_buf, root, rtindex, rel, + (bool)(tlist_buf->len == 0), true, + attrs_used, &retrieved_attrs); + + /* + * Columns being referenced in target-list and local conditions has + * to be fetched from the remote server, but not all the columns. + */ + tupdesc = RelationGetDescr(rel); + foreach (lc, retrieved_attrs) + { + AttrNumber anum = lfirst_int(lc); + Form_pg_attribute attr = tupdesc->attrs[anum - 1]; + + *select_vars = lappend(*select_vars, + makeVar(rtindex, + anum, + attr->atttypid, + attr->atttypmod, + attr->attcollation, + 0)); + } + /* deparse WHERE clause, to be appended later */ + fpinfo = (PgFdwRelationInfo *) baserel->fdw_private; + if (fpinfo->remote_conds) + appendWhereClause(where_buf, root, baserel, + fpinfo->remote_conds, + where_buf->len == 0, false, true, + select_params); + + heap_close(rel, NoLock); + } + else + elog(ERROR, "unexpected path type: %d", (int)nodeTag(relinfo)); +} + +/* + * deparseRemoteJoinSql + * + * It deparses a join tree to be executed on the remote server. + * It assumes the top-level 'relinfo' is one for remote join relation, thus + * it has to be a List object that packs PgRemoteJoinInfo. + */ +void +deparseRemoteJoinSql(StringInfo buf, PlannerInfo *root, + List *relinfo, + List *target_list, + List *local_conds, + List **select_vars, + List **select_params) +{ + StringInfoData tlist_buf; + StringInfoData from_buf; + StringInfoData where_buf; + + Assert(IsA(relinfo, List)); + initStringInfo(&tlist_buf); + initStringInfo(&from_buf); + initStringInfo(&where_buf); + + deparseRemoteJoinRelation(&tlist_buf, &from_buf, &where_buf, + root, (Node *)relinfo, + target_list, local_conds, + select_vars, select_params); + appendStringInfo(buf, "SELECT %s FROM %s%s", + tlist_buf.len > 0 ? tlist_buf.data : "NULL", + from_buf.data, + where_buf.len > 0 ? where_buf.data : ""); + pfree(tlist_buf.data); + pfree(from_buf.data); } /* @@ -1060,7 +1228,8 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs) * If it has a column_name FDW option, use that instead of attribute name. */ static void -deparseColumnRef(StringInfo buf, int varno, int varattno, PlannerInfo *root) +deparseColumnRef(StringInfo buf, int varno, int varattno, + bool var_qualified, PlannerInfo *root) { RangeTblEntry *rte; char *colname = NULL; @@ -1096,6 +1265,13 @@ deparseColumnRef(StringInfo buf, int varno, int varattno, PlannerInfo *root) if (colname == NULL) colname = get_relid_attribute_name(rte->relid, varattno); + /* + * In case of remote join, column reference may become bogus without + * qualification to relations. + */ + if (var_qualified) + appendStringInfo(buf, "r%d.", varno); + appendStringInfoString(buf, quote_identifier(colname)); } @@ -1243,11 +1419,12 @@ deparseVar(Var *node, deparse_expr_cxt *context) { StringInfo buf = context->buf; - if (node->varno == context->foreignrel->relid && + if (bms_is_member(node->varno, context->foreignrel->relids) && node->varlevelsup == 0) { /* Var belongs to foreign table */ - deparseColumnRef(buf, node->varno, node->varattno, context->root); + deparseColumnRef(buf, node->varno, node->varattno, + context->var_qualified, context->root); } else { diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 38c6cf8..e6368c5 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -450,17 +450,12 @@ EXPLAIN (VERBOSE, COSTS false) SELECT * FROM ft1 t1 WHERE c8 = 'foo'; -- can't -- parameterized remote path EXPLAIN (VERBOSE, COSTS false) SELECT * FROM ft2 a, ft2 b WHERE a.c1 = 47 AND b.c1 = a.c2; - QUERY PLAN -------------------------------------------------------------------------------------------------------------- - Nested Loop - Output: a.c1, a.c2, a.c3, a.c4, a.c5, a.c6, a.c7, a.c8, b.c1, b.c2, b.c3, b.c4, b.c5, b.c6, b.c7, b.c8 - -> Foreign Scan on public.ft2 a - Output: a.c1, a.c2, a.c3, a.c4, a.c5, a.c6, a.c7, a.c8 - Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" = 47)) - -> Foreign Scan on public.ft2 b - Output: b.c1, b.c2, b.c3, b.c4, b.c5, b.c6, b.c7, b.c8 - Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (($1::integer = "C 1")) -(8 rows) + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Custom Scan (postgres-fdw) + Output: c1, c2, c3, c4, c5, c6, c7, c8, c1, c2, c3, c4, c5, c6, c7, c8 + Remote SQL: SELECT r1."C 1", r1.c2, r1.c3, r1.c4, r1.c5, r1.c6, r1.c7, r1.c8, r2."C 1", r2.c2, r2.c3, r2.c4, r2.c5, r2.c6, r2.c7, r2.c8 FROM ("S 1"."T 1" r1 JOIN "S 1"."T 1" r2 ON ((r1.c2 = r2."C 1"))) WHERE ((r1."C 1" = 47)) +(3 rows) SELECT * FROM ft2 a, ft2 b WHERE a.c1 = 47 AND b.c1 = a.c2; c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 @@ -474,17 +469,12 @@ SELECT * FROM ft2 a, ft2 b WHERE a.c1 = 47 AND b.c1 = a.c2; -- simple join PREPARE st1(int, int) AS SELECT t1.c3, t2.c3 FROM ft1 t1, ft2 t2 WHERE t1.c1 = $1 AND t2.c1 = $2; EXPLAIN (VERBOSE, COSTS false) EXECUTE st1(1, 2); - QUERY PLAN --------------------------------------------------------------------- - Nested Loop - Output: t1.c3, t2.c3 - -> Foreign Scan on public.ft1 t1 - Output: t1.c3 - Remote SQL: SELECT c3 FROM "S 1"."T 1" WHERE (("C 1" = 1)) - -> Foreign Scan on public.ft2 t2 - Output: t2.c3 - Remote SQL: SELECT c3 FROM "S 1"."T 1" WHERE (("C 1" = 2)) -(8 rows) + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------- + Custom Scan (postgres-fdw) + Output: c3, c3 + Remote SQL: SELECT r1.c3, r2.c3 FROM ("S 1"."T 1" r1 JOIN "S 1"."T 1" r2 ON true) WHERE ((r1."C 1" = 1)) AND ((r2."C 1" = 2)) +(3 rows) EXECUTE st1(1, 1); c3 | c3 diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 246a3a9..6786b89 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -19,6 +19,7 @@ #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" +#include "executor/nodeCustom.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" @@ -47,40 +48,6 @@ PG_MODULE_MAGIC; #define DEFAULT_FDW_TUPLE_COST 0.01 /* - * FDW-specific planner information kept in RelOptInfo.fdw_private for a - * foreign table. This information is collected by postgresGetForeignRelSize. - */ -typedef struct PgFdwRelationInfo -{ - /* baserestrictinfo clauses, broken down into safe and unsafe subsets. */ - List *remote_conds; - List *local_conds; - - /* Bitmap of attr numbers we need to fetch from the remote server. */ - Bitmapset *attrs_used; - - /* Cost and selectivity of local_conds. */ - QualCost local_conds_cost; - Selectivity local_conds_sel; - - /* Estimated size and cost for a scan with baserestrictinfo quals. */ - double rows; - int width; - Cost startup_cost; - Cost total_cost; - - /* Options extracted from catalogs. */ - bool use_remote_estimate; - Cost fdw_startup_cost; - Cost fdw_tuple_cost; - - /* Cached catalog information. */ - ForeignTable *table; - ForeignServer *server; - UserMapping *user; /* only set in use_remote_estimate mode */ -} PgFdwRelationInfo; - -/* * Indexes of FDW-private information stored in fdw_private lists. * * We store various information in ForeignScan.fdw_private to pass it from @@ -129,6 +96,9 @@ enum FdwModifyPrivateIndex typedef struct PgFdwScanState { Relation rel; /* relcache entry for the foreign table */ + List *join_rels; /* list of underlying relcache entries, if * + * remote join on top of CustomScan */ + TupleDesc scan_tupdesc; /* tuple descriptor of scanned relation */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* extracted fdw_private data */ @@ -214,7 +184,8 @@ typedef struct PgFdwAnalyzeState */ typedef struct ConversionLocation { - Relation rel; /* foreign table's relcache entry */ + const char *relname; /* name of the foreign table, if any */ + TupleDesc tupdesc; /* tuple descriptor of scanned relation */ AttrNumber cur_attno; /* attribute number being processed, or 0 */ } ConversionLocation; @@ -306,8 +277,8 @@ static void get_remote_estimate(const char *sql, static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg); -static void create_cursor(ForeignScanState *node); -static void fetch_more_data(ForeignScanState *node); +static void create_cursor(PgFdwScanState *fsstate, ExprContext *econtext); +static void fetch_more_data(PgFdwScanState *fsstate); static void close_cursor(PGconn *conn, unsigned int cursor_number); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, @@ -323,12 +294,19 @@ static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate); static HeapTuple make_tuple_from_result_row(PGresult *res, int row, - Relation rel, + const char *relname, + TupleDesc tupdesc, AttInMetadata *attinmeta, List *retrieved_attrs, MemoryContext temp_context); static void conversion_error_callback(void *arg); +void _PG_init(void); + +/* + * Static variables + */ +static add_join_path_hook_type add_join_path_next = NULL; /* * Foreign-data wrapper handler function: return a struct with pointers @@ -444,7 +422,7 @@ postgresGetForeignRelSize(PlannerInfo *root, * Identify which baserestrictinfo clauses can be sent to the remote * server and which can't. */ - classifyConditions(root, baserel, + classifyConditions(root, baserel, baserel->baserestrictinfo, &fpinfo->remote_conds, &fpinfo->local_conds); /* @@ -770,7 +748,7 @@ postgresGetForeignPlan(PlannerInfo *root, &retrieved_attrs); if (remote_conds) appendWhereClause(&sql, root, baserel, remote_conds, - true, ¶ms_list); + true, false, false, ¶ms_list); /* * Add FOR UPDATE/SHARE if appropriate. We apply locking during the @@ -844,84 +822,59 @@ postgresGetForeignPlan(PlannerInfo *root, * postgresBeginForeignScan * Initiate an executor scan of a foreign PostgreSQL table. */ -static void -postgresBeginForeignScan(ForeignScanState *node, int eflags) +static PgFdwScanState * +commonBeginForeignScan(PlanState *ps, TupleDesc tupdesc, + Oid serverid, Oid userid, + char *remote_query, List *retrieved_attrs, + List *remote_exprs) { - ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; - EState *estate = node->ss.ps.state; PgFdwScanState *fsstate; - RangeTblEntry *rte; - Oid userid; - ForeignTable *table; - ForeignServer *server; - UserMapping *user; - int numParams; - int i; - ListCell *lc; + ForeignServer *server; + UserMapping *user; + int numParams; + int i; + ListCell *lc; - /* - * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. - */ - if (eflags & EXEC_FLAG_EXPLAIN_ONLY) - return; - - /* - * We'll save private state in node->fdw_state. - */ + /* Allocation of private state */ fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState)); - node->fdw_state = (void *) fsstate; - - /* - * Identify which user to do the remote access as. This should match what - * ExecCheckRTEPerms() does. - */ - rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table); - userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); - - /* Get info about foreign table. */ - fsstate->rel = node->ss.ss_currentRelation; - table = GetForeignTable(RelationGetRelid(fsstate->rel)); - server = GetForeignServer(table->serverid); - user = GetUserMapping(userid, server->serverid); + fsstate->scan_tupdesc = tupdesc; + fsstate->query = remote_query; + fsstate->retrieved_attrs = retrieved_attrs; /* * Get connection to the foreign server. Connection manager will - * establish new connection if necessary. + * establish new connection on demand. */ + server = GetForeignServer(serverid); + user = GetUserMapping(userid, serverid); fsstate->conn = GetConnection(server, user, false); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); fsstate->cursor_exists = false; - /* Get private info created by planner functions. */ - fsstate->query = strVal(list_nth(fsplan->fdw_private, - FdwScanPrivateSelectSql)); - fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, - FdwScanPrivateRetrievedAttrs); - /* Create contexts for batches of tuples and per-tuple temp workspace. */ - fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, + fsstate->batch_cxt = AllocSetContextCreate(ps->state->es_query_cxt, "postgres_fdw tuple data", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, + fsstate->temp_cxt = AllocSetContextCreate(ps->state->es_query_cxt, "postgres_fdw temporary data", ALLOCSET_SMALL_MINSIZE, ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); /* Get info we'll need for input data conversion. */ - fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel)); + fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->scan_tupdesc); /* Prepare for output conversion of parameters used in remote query. */ - numParams = list_length(fsplan->fdw_exprs); + numParams = list_length(remote_exprs); fsstate->numParams = numParams; fsstate->param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams); i = 0; - foreach(lc, fsplan->fdw_exprs) + foreach(lc, remote_exprs) { Node *param_expr = (Node *) lfirst(lc); Oid typefnoid; @@ -940,17 +893,62 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * benefit, and it'd require postgres_fdw to know more than is desirable * about Param evaluation.) */ - fsstate->param_exprs = (List *) - ExecInitExpr((Expr *) fsplan->fdw_exprs, - (PlanState *) node); + fsstate->param_exprs = (List *) ExecInitExpr((Expr *) remote_exprs, ps); /* * Allocate buffer for text form of query parameters, if any. */ if (numParams > 0) - fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *)); + fsstate->param_values = palloc0(numParams * sizeof(char *)); else fsstate->param_values = NULL; + + return fsstate; +} + +static void +postgresBeginForeignScan(ForeignScanState *node, int eflags) +{ + ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; + PgFdwScanState *fsstate; + EState *estate = node->ss.ps.state; + Relation rel; + char *remote_query; + List *retrieved_attrs; + RangeTblEntry *rte; + Oid userid; + ForeignTable *table; + + /* + * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. + */ + if (eflags & EXEC_FLAG_EXPLAIN_ONLY) + return; + + /* + * Identify which user to do the remote access as. This should match what + * ExecCheckRTEPerms() does. + */ + rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table); + userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + + /* Get info about foreign table. */ + rel = node->ss.ss_currentRelation; + table = GetForeignTable(RelationGetRelid(rel)); + + /* Get private info created by planner functions. */ + remote_query = strVal(list_nth(fsplan->fdw_private, + FdwScanPrivateSelectSql)); + retrieved_attrs = (List *) list_nth(fsplan->fdw_private, + FdwScanPrivateRetrievedAttrs); + + fsstate = commonBeginForeignScan(&node->ss.ps, RelationGetDescr(rel), + table->serverid, userid, + remote_query, retrieved_attrs, + fsplan->fdw_exprs); + fsstate->rel = rel; + + node->fdw_state = fsstate; } /* @@ -959,17 +957,15 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * EOF. */ static TupleTableSlot * -postgresIterateForeignScan(ForeignScanState *node) +commonIterateForeignScan(PgFdwScanState *fsstate, PlanState *ps, + TupleTableSlot *slot) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; - TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; - /* * If this is the first call after Begin or ReScan, we need to create the * cursor on the remote side. */ if (!fsstate->cursor_exists) - create_cursor(node); + create_cursor(fsstate, ps->ps_ExprContext); /* * Get some more tuples, if we've run out. @@ -978,7 +974,7 @@ postgresIterateForeignScan(ForeignScanState *node) { /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) - fetch_more_data(node); + fetch_more_data(fsstate); /* If we didn't get any tuples, must be end of data. */ if (fsstate->next_tuple >= fsstate->num_tuples) return ExecClearTuple(slot); @@ -995,14 +991,22 @@ postgresIterateForeignScan(ForeignScanState *node) return slot; } +static TupleTableSlot * +postgresIterateForeignScan(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + + return commonIterateForeignScan(fsstate, &node->ss.ps, slot); +} + /* * postgresReScanForeignScan * Restart the scan. */ static void -postgresReScanForeignScan(ForeignScanState *node) +commonReScanForeignScan(PgFdwScanState *fsstate, PlanState *ps) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; char sql[64]; PGresult *res; @@ -1016,7 +1020,7 @@ postgresReScanForeignScan(ForeignScanState *node) * be good enough. If we've only fetched zero or one batch, we needn't * even rewind the cursor, just rescan what we have. */ - if (node->ss.ps.chgParam != NULL) + if (ps->chgParam != NULL) { fsstate->cursor_exists = false; snprintf(sql, sizeof(sql), "CLOSE c%u", @@ -1051,19 +1055,21 @@ postgresReScanForeignScan(ForeignScanState *node) fsstate->eof_reached = false; } +static void +postgresReScanForeignScan(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + + commonReScanForeignScan(fsstate, &node->ss.ps); +} + /* * postgresEndForeignScan * Finish scanning foreign table and dispose objects used for this scan */ static void -postgresEndForeignScan(ForeignScanState *node) +commonEndForeignScan(PgFdwScanState *fsstate) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; - - /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ - if (fsstate == NULL) - return; - /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) close_cursor(fsstate->conn, fsstate->cursor_number); @@ -1075,6 +1081,18 @@ postgresEndForeignScan(ForeignScanState *node) /* MemoryContexts will be deleted automatically. */ } +static void +postgresEndForeignScan(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + + /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ + if (fsstate == NULL) + return; + + commonEndForeignScan(fsstate); +} + /* * postgresAddForeignUpdateTargets * Add resjunk column(s) needed for update/delete on a foreign table @@ -1704,10 +1722,10 @@ estimate_path_cost_size(PlannerInfo *root, &retrieved_attrs); if (fpinfo->remote_conds) appendWhereClause(&sql, root, baserel, fpinfo->remote_conds, - true, NULL); + true, false, false, NULL); if (join_conds) appendWhereClause(&sql, root, baserel, join_conds, - (fpinfo->remote_conds == NIL), NULL); + (fpinfo->remote_conds == NIL), false, false, NULL); /* Get the remote estimate */ conn = GetConnection(fpinfo->server, fpinfo->user, false); @@ -1863,10 +1881,8 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, * Create cursor for node's query with current parameter values. */ static void -create_cursor(ForeignScanState *node) +create_cursor(PgFdwScanState *fsstate, ExprContext *econtext) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; - ExprContext *econtext = node->ss.ps.ps_ExprContext; int numParams = fsstate->numParams; const char **values = fsstate->param_values; PGconn *conn = fsstate->conn; @@ -1953,9 +1969,8 @@ create_cursor(ForeignScanState *node) * Fetch some more rows from the node's cursor. */ static void -fetch_more_data(ForeignScanState *node) +fetch_more_data(PgFdwScanState *fsstate) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; PGresult *volatile res = NULL; MemoryContext oldcontext; @@ -1975,6 +1990,7 @@ fetch_more_data(ForeignScanState *node) int fetch_size; int numrows; int i; + const char *relname = NULL; /* The fetch size is arbitrary, but shouldn't be enormous. */ fetch_size = 100; @@ -1993,11 +2009,15 @@ fetch_more_data(ForeignScanState *node) fsstate->num_tuples = numrows; fsstate->next_tuple = 0; + if (fsstate->rel) + relname = RelationGetRelationName(fsstate->rel); + for (i = 0; i < numrows; i++) { fsstate->tuples[i] = make_tuple_from_result_row(res, i, - fsstate->rel, + relname, + fsstate->scan_tupdesc, fsstate->attinmeta, fsstate->retrieved_attrs, fsstate->temp_cxt); @@ -2215,11 +2235,13 @@ store_returning_result(PgFdwModifyState *fmstate, { HeapTuple newtup; - newtup = make_tuple_from_result_row(res, 0, - fmstate->rel, - fmstate->attinmeta, - fmstate->retrieved_attrs, - fmstate->temp_cxt); + newtup = + make_tuple_from_result_row(res, 0, + RelationGetRelationName(fmstate->rel), + RelationGetDescr(fmstate->rel), + fmstate->attinmeta, + fmstate->retrieved_attrs, + fmstate->temp_cxt); /* tuple will be deleted when it is cleared from the slot */ ExecStoreTuple(newtup, slot, InvalidBuffer, true); } @@ -2507,11 +2529,13 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate) */ oldcontext = MemoryContextSwitchTo(astate->anl_cxt); - astate->rows[pos] = make_tuple_from_result_row(res, row, - astate->rel, - astate->attinmeta, - astate->retrieved_attrs, - astate->temp_cxt); + astate->rows[pos] = + make_tuple_from_result_row(res, row, + RelationGetRelationName(astate->rel), + RelationGetDescr(astate->rel), + astate->attinmeta, + astate->retrieved_attrs, + astate->temp_cxt); MemoryContextSwitchTo(oldcontext); } @@ -2528,13 +2552,13 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate) static HeapTuple make_tuple_from_result_row(PGresult *res, int row, - Relation rel, + const char *relname, + TupleDesc tupdesc, AttInMetadata *attinmeta, List *retrieved_attrs, MemoryContext temp_context) { HeapTuple tuple; - TupleDesc tupdesc = RelationGetDescr(rel); Datum *values; bool *nulls; ItemPointer ctid = NULL; @@ -2561,7 +2585,8 @@ make_tuple_from_result_row(PGresult *res, /* * Set up and install callback to report where conversion error occurs. */ - errpos.rel = rel; + errpos.relname = relname; + errpos.tupdesc = tupdesc; errpos.cur_attno = 0; errcallback.callback = conversion_error_callback; errcallback.arg = (void *) &errpos; @@ -2646,10 +2671,794 @@ static void conversion_error_callback(void *arg) { ConversionLocation *errpos = (ConversionLocation *) arg; - TupleDesc tupdesc = RelationGetDescr(errpos->rel); - if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts) - errcontext("column \"%s\" of foreign table \"%s\"", - NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname), - RelationGetRelationName(errpos->rel)); + if (errpos->cur_attno > 0 && errpos->cur_attno <= errpos->tupdesc->natts) + { + Form_pg_attribute attr = errpos->tupdesc->attrs[errpos->cur_attno - 1]; + + if (errpos->relname) + errcontext("column \"%s\" of foreign table \"%s\"", + NameStr(attr->attname), errpos->relname); + else + errcontext("column \"%s\" of remote join relation", + NameStr(attr->attname)); + } +} + +/* ------------------------------------------------------------ + * + * Remote JOIN support + * + * ------------------------------------------------------------ + */ +enum PgRemoteJoinPrivateIndex +{ + PgCust_FdwServUserIds, /* oid pair of foreign server and user */ + PgCust_JoinRelids, /* bitmapset of rtindexes to be joinned */ + PgCust_JoinType, /* one of JOIN_* */ + PgCust_OuterRel, /* packed joinrel of outer relation */ + PgCust_InnerRel, /* packed joinrel of inner relation */ + PgCust_RemoteConds, /* remote conditions */ + PgCust_LocalConds, /* local conditions */ + PgCust_SelectVars, /* list of Var nodes to be fetched */ + PgCust_SelectParams, /* list of Var nodes being parameterized */ + PgCust_SelectSql, /* remote query being deparsed */ +}; + +/* + * packPgRemoteJoinInfo + * + * pack PgRemoteJoinInfo into a List object to save as private datum + */ +List * +packPgRemoteJoinInfo(PgRemoteJoinInfo *jinfo) +{ + List *result = NIL; + + /* PgCust_FdwServUserIds */ + result = lappend(result, list_make2_oid(jinfo->fdw_server_oid, + jinfo->fdw_user_oid)); + /* PgCust_JoinRelids */ + result = lappend(result, makeString(bms_to_string(jinfo->relids))); + /* PgCust_JoinType */ + result = lappend(result, makeInteger((long) jinfo->jointype)); + /* PgCust_OuterRel */ + result = lappend(result, jinfo->outer_rel); + /* PgCust_InnerRel */ + result = lappend(result, jinfo->inner_rel); + /* PgCust_RemoteConds */ + result = lappend(result, jinfo->remote_conds); + /* PgCust_LocalConds */ + result = lappend(result, jinfo->local_conds); + /* PgCust_SelectVars */ + result = lappend(result, jinfo->select_vars); + /* PgCust_SelectParams */ + result = lappend(result, jinfo->select_params); + /* PgCust_SelectSql */ + result = lappend(result, makeString(jinfo->select_qry)); + + return result; +} + +/* + * unpackPgRemoteJoinInfo + * + * unpack a private datum to PgRemoteJoinInfo + */ +void +unpackPgRemoteJoinInfo(PgRemoteJoinInfo *jinfo, List *custom_private) +{ + ListCell *lc; + int index = PgCust_FdwServUserIds; + + memset(jinfo, 0, sizeof(PgRemoteJoinInfo)); + foreach (lc, custom_private) + { + switch (index) + { + case PgCust_FdwServUserIds: + jinfo->fdw_server_oid = linitial_oid(lfirst(lc)); + jinfo->fdw_user_oid = lsecond_oid(lfirst(lc)); + break; + case PgCust_JoinRelids: + jinfo->relids = bms_from_string(strVal(lfirst(lc))); + break; + case PgCust_JoinType: + jinfo->jointype = (JoinType) intVal(lfirst(lc)); + break; + case PgCust_OuterRel: + Assert(IsA(lfirst(lc), List) || IsA(lfirst(lc), Integer)); + jinfo->outer_rel = lfirst(lc); + break; + case PgCust_InnerRel: + Assert(IsA(lfirst(lc), List) || IsA(lfirst(lc), Integer)); + jinfo->inner_rel = lfirst(lc); + break; + case PgCust_RemoteConds: + jinfo->remote_conds = lfirst(lc); + break; + case PgCust_LocalConds: + jinfo->local_conds = lfirst(lc); + break; + case PgCust_SelectVars: + jinfo->select_vars = lfirst(lc); + break; + case PgCust_SelectParams: + jinfo->select_params = lfirst(lc); + break; + case PgCust_SelectSql: + jinfo->select_qry = strVal(lfirst(lc)); + break; + default: + elog(ERROR, "unexpected member in remote join relinfo"); + } + index++; + } +} + +/* + * is_self_managed_relation + * + * It checks whether the supplied relation is either a foreign table or remote + * join managed by postgres_fdw. If not, false shall be returned. + * If it is a managed relation, some related properties shall be returned to + * the caller. + */ +static bool +is_self_managed_relation(PlannerInfo *root, RelOptInfo *rel, + Oid *fdw_server_oid, Oid *fdw_user_oid, + Node **relinfo, + List **remote_conds, List **local_conds) +{ + if (rel->reloptkind == RELOPT_BASEREL) + { + FdwRoutine pgroutine; + PgFdwRelationInfo *fpinfo; + RangeTblEntry *rte = planner_rt_fetch(rel->relid, root); + + /* Is it a foreign table managed by postgres_fdw? */ + memset(&pgroutine, 0, sizeof(FdwRoutine)); + pgroutine.GetForeignRelSize = postgresGetForeignRelSize; + + if (!is_fdw_managed_relation(rte->relid, &pgroutine)) + return false; + + /* + * Inform the caller its server-id and local user-id also. + * Note that remote user-id is determined according to the pair + * of server-id and local user-id on execution time, not planning + * stage, so we might need to pay attention a scenario that executes + * a plan with different user-id. + * However, all we need to know here is whether both of relations + * shall be run with same credential, or not. Its identical user-id + * is not required here. + * So, InvalidOid shall be set on fdw_user_oid for comparison + * purpose, if it runs based on the credential of GetUserId(). + */ + *fdw_user_oid = rte->checkAsUser; + + fpinfo = (PgFdwRelationInfo *) rel->fdw_private; + *fdw_server_oid = fpinfo->server->serverid; + *remote_conds = fpinfo->remote_conds; + *local_conds = fpinfo->local_conds; + + *relinfo = (Node *) makeInteger(rel->relid); + + return true; + } + else if (rel->reloptkind == RELOPT_JOINREL) + { + ListCell *cell; + + foreach (cell, rel->pathlist) + { + CustomPath *cpath = lfirst(cell); + + if (IsA(cpath, CustomPath) && + strcmp(cpath->custom_name, "postgres-fdw") == 0) + { + PgRemoteJoinInfo jinfo; + + /* + * Note that CustomScan(postgres-fdw) should be constructed + * only when underlying foreign tables use identical server + * and user-id for each. + */ + unpackPgRemoteJoinInfo(&jinfo, cpath->custom_private); + *fdw_server_oid = jinfo.fdw_server_oid; + *fdw_user_oid = jinfo.fdw_user_oid; + *remote_conds = jinfo.remote_conds; + *local_conds = jinfo.local_conds; + + *relinfo = (Node *) cpath->custom_private; + + return true; + } + } + } + return false; +} + +/* + * estimate_remote_join_cost + * + * It calculates cost for remote join, then put them on the Path structure. + */ +static void +estimate_remote_join_cost(PlannerInfo *root, + CustomPath *cpath, + PgRemoteJoinInfo *jinfo, + SpecialJoinInfo *sjinfo) +{ + RelOptInfo *joinrel = cpath->path.parent; + ForeignServer *server; + ListCell *lc; + Cost startup_cost = DEFAULT_FDW_STARTUP_COST; + Cost tuple_cost = DEFAULT_FDW_TUPLE_COST; + Cost total_cost; + QualCost qual_cost; + Selectivity local_sel; + Selectivity remote_sel; + double rows = joinrel->rows; + double retrieved_rows; + + server = GetForeignServer(jinfo->fdw_server_oid); + foreach(lc, server->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "fdw_startup_cost") == 0) + startup_cost = strtod(defGetString(def), NULL); + else if (strcmp(def->defname, "fdw_tuple_cost") == 0) + tuple_cost = strtod(defGetString(def), NULL); + } + cost_qual_eval(&qual_cost, jinfo->local_conds, root); + local_sel = clauselist_selectivity(root, + jinfo->local_conds, + 0, + JOIN_INNER, + NULL); + remote_sel = clauselist_selectivity(root, + jinfo->remote_conds, + 0, + jinfo->jointype, + sjinfo); + retrieved_rows = remote_sel * rows; + + startup_cost += qual_cost.startup * retrieved_rows; + total_cost = startup_cost; + total_cost += tuple_cost * retrieved_rows; + total_cost += qual_cost.per_tuple * retrieved_rows; + total_cost += cpu_tuple_cost * local_sel * retrieved_rows; + + cpath->path.rows = local_sel * retrieved_rows; + cpath->path.startup_cost = startup_cost; + cpath->path.total_cost = total_cost; +} + +/* + * postgresAddJoinPaths + * + * A callback routine of add_join_path_hook. It checks whether this join can + * be run on the remote server, and add a custom-scan path that launches + * a remote join instead of a pair of remote scan and local join. + */ +static void +postgresAddJoinPaths(PlannerInfo *root, + RelOptInfo *joinrel, + RelOptInfo *outerrel, + RelOptInfo *innerrel, + JoinType jointype, + SpecialJoinInfo *sjinfo, + List *restrictlist, + List *mergeclause_list, + SemiAntiJoinFactors *semifactors, + Relids param_source_rels, + Relids extra_lateral_rels) +{ + Oid o_server_oid; + Oid o_user_oid; + Node *o_relinfo; + List *o_local_conds; + List *o_remote_conds; + Oid i_server_oid; + Oid i_user_oid; + Node *i_relinfo; + List *i_local_conds; + List *i_remote_conds; + List *j_local_conds; + List *j_remote_conds; + ListCell *lc; + Relids required_outer; + PgRemoteJoinInfo jinfo; + CustomPath *cpath; + + if (add_join_path_next) + (*add_join_path_next)(root, joinrel, outerrel, innerrel, + jointype, sjinfo, restrictlist, + mergeclause_list, semifactors, + param_source_rels, extra_lateral_rels); + + /* only regular SQL JOIN syntax is supported */ + if (jointype != JOIN_INNER && jointype != JOIN_LEFT && + jointype != JOIN_FULL && jointype != JOIN_RIGHT) + return; + + /* outerrel is managed by this extension? */ + if (!is_self_managed_relation(root, outerrel, + &o_server_oid, &o_user_oid, &o_relinfo, + &o_remote_conds, &o_local_conds)) + return; + + /* innerrel is managed by this extension? */ + if (!is_self_managed_relation(root, innerrel, + &i_server_oid, &i_user_oid, &i_relinfo, + &i_remote_conds, &i_local_conds)) + return; + + /* Is remote query run with a common credential? */ + if (o_server_oid != i_server_oid || o_user_oid != i_user_oid) + return; + + /* unable to pull up local conditions any more */ + if ((jointype == JOIN_LEFT && o_local_conds != NIL) || + (jointype == JOIN_RIGHT && i_local_conds != NIL) || + (jointype == JOIN_FULL && (o_local_conds != NIL || + i_local_conds != NIL))) + return; + + classifyConditions(root, joinrel, restrictlist, + &j_remote_conds, &j_local_conds); + /* pull-up local conditions, if any */ + j_local_conds = list_concat(j_local_conds, o_local_conds); + j_local_conds = list_concat(j_local_conds, i_local_conds); + + /* + * Not supported to run remote join if whole-row reference is + * included in either of target-list or local-conditions. + * + * XXX - Because we don't have reasonable way to reconstruct a RECORD + * datum from individual columns once extracted. On the other hand, it + * takes additional network bandwidth if we put whole-row reference on + * the remote-join query. + */ + if (contain_wholerow_reference((Node *)joinrel->reltargetlist) || + contain_wholerow_reference((Node *)j_local_conds)) + return; + + required_outer = pull_varnos((Node *) joinrel->reltargetlist); + foreach (lc, j_local_conds) + { + RestrictInfo *rinfo = lfirst(lc); + + required_outer = bms_union(required_outer, + pull_varnos((Node *)rinfo->clause)); + } + required_outer = bms_difference(required_outer, joinrel->relids); + + /* OK, make a CustomScan node to run remote join */ + cpath = makeNode(CustomPath); + cpath->path.pathtype = T_CustomScan; + cpath->path.parent = joinrel; + cpath->path.param_info = get_baserel_parampathinfo(root, joinrel, + required_outer); + cpath->custom_name = pstrdup("postgres-fdw"); + cpath->custom_flags = 0; + + memset(&jinfo, 0, sizeof(PgRemoteJoinInfo)); + jinfo.fdw_server_oid = o_server_oid; + jinfo.fdw_user_oid = o_user_oid; + jinfo.relids = joinrel->relids; + jinfo.jointype = jointype; + jinfo.outer_rel = o_relinfo; + jinfo.inner_rel = i_relinfo; + jinfo.remote_conds = j_remote_conds; + jinfo.local_conds = j_local_conds; + + cpath->custom_private = packPgRemoteJoinInfo(&jinfo); + + estimate_remote_join_cost(root, cpath, &jinfo, sjinfo); + + add_path(joinrel, &cpath->path); +} + +/* + * postgresInitCustomScanPlan + * + * construction of CustomScan according to remote join path above. + */ +static void +postgresInitCustomScanPlan(PlannerInfo *root, + CustomScan *cscan_plan, + CustomPath *cscan_path, + List *tlist, + List *scan_clauses) +{ + PgRemoteJoinInfo jinfo; + StringInfoData sql; + List *relinfo = cscan_path->custom_private; + List *local_conds = NIL; + List *remote_conds = NIL; + ListCell *lc; + + Assert(cscan_path->path.parent->reloptkind == RELOPT_JOINREL); + unpackPgRemoteJoinInfo(&jinfo, relinfo); + + /* pulls expressions from RestrictInfo */ + local_conds = extract_actual_clauses(jinfo.local_conds, false); + remote_conds = extract_actual_clauses(jinfo.remote_conds, false); + + foreach (lc, scan_clauses) + { + RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); + + Assert(IsA(rinfo, RestrictInfo)); + + /* Ignore any pseudoconstants, they're dealt with elsewhere */ + if (rinfo->pseudoconstant) + continue; + + if (!list_member(remote_conds, rinfo->clause) && + !list_member(local_conds, rinfo->clause)) + local_conds = lappend(local_conds, rinfo->clause); + } + + /* construct a remote join query */ + initStringInfo(&sql); + deparseRemoteJoinSql(&sql, root, cscan_path->custom_private, + tlist, + local_conds, + &jinfo.select_vars, + &jinfo.select_params); + jinfo.local_conds = NIL; /* never used any more */ + jinfo.remote_conds = NIL; /* never used any more */ + jinfo.select_qry = sql.data; + + cscan_plan->scan.plan.targetlist = tlist; + cscan_plan->scan.plan.qual = local_conds; + cscan_plan->custom_exprs = remote_conds; + cscan_plan->custom_private = packPgRemoteJoinInfo(&jinfo); +} + +/* + * fixup_remote_join_expr + * + * Var nodes that reference a relation of remote join have varno of underlying + * foreign tables. It makes a problem because it shall be eventually replaced + * by references to outer or inner relation, however, result of remote join is + * stored on the scan-tuple-slot neither outer nor inner. + * So, we need to replace varno of Var nodes that reference a relation of + * remote join by CUSTOM_VAR; that is a pseudo varno to reference a tuple in + * the scan-tuple-slot. + */ +typedef struct { + PlannerInfo *root; + List *select_vars; + int rtoffset; +} fixup_remote_join_context; + +static Node * +fixup_remote_join_mutator(Node *node, fixup_remote_join_context *context) +{ + if (node == NULL) + return false; + if (IsA(node, Var)) + { + Var *newvar = (Var *) copyObject(node); + ListCell *lc; + AttrNumber resno = 1; + + /* remote columns are ordered according to the select_vars */ + foreach (lc, context->select_vars) + { + Var *selvar = (Var *) lfirst(lc); + + Assert(newvar->varlevelsup == 0); + + if (newvar->varno == selvar->varno && + newvar->varattno == selvar->varattno) + { + Assert(newvar->vartype == selvar->vartype); + Assert(newvar->vartypmod == selvar->vartypmod); + Assert(newvar->varcollid == selvar->varcollid); + + newvar->varno = CUSTOM_VAR; + newvar->varattno = resno; + + return (Node *) newvar; + } + resno++; + } + elog(ERROR, "referenced variable was not in select_vars"); + } + if (IsA(node, CurrentOfExpr)) + { + CurrentOfExpr *cexpr = (CurrentOfExpr *) copyObject(node); + + Assert(cexpr->cvarno != INNER_VAR); + Assert(cexpr->cvarno != OUTER_VAR); + if (!IS_SPECIAL_VARNO(cexpr->cvarno)) + cexpr->cvarno += context->rtoffset; + return (Node *) cexpr; + } + if (IsA(node, PlaceHolderVar)) + { + /* At scan level, we should always just evaluate the contained expr */ + PlaceHolderVar *phv = (PlaceHolderVar *) node; + + return fixup_remote_join_mutator((Node *) phv->phexpr, context); + } + fix_expr_common(context->root, node); + return expression_tree_mutator(node, fixup_remote_join_mutator, + (void *) context); +} + +static Node * +fixup_remote_join_expr(Node *node, PlannerInfo *root, + List *select_vars, int rtoffset) +{ + fixup_remote_join_context context; + + context.root = root; + context.select_vars = select_vars; + context.rtoffset = rtoffset; + + return fixup_remote_join_mutator(node, &context); +} + +/* + * postgresSetPlanRefCustomScan + * + * We need a special treatment of Var nodes to reference columns in remote + * join relation, because we replaces a join relation by a remote query that + * returns a result of join being executed remotely. + */ +static void +postgresSetPlanRefCustomScan(PlannerInfo *root, + CustomScan *csplan, + int rtoffset) +{ + PgRemoteJoinInfo jinfo; + + Assert(csplan->scan.scanrelid == 0); + + unpackPgRemoteJoinInfo(&jinfo, csplan->custom_private); + + csplan->scan.plan.targetlist = + (List *) fixup_remote_join_expr((Node *)csplan->scan.plan.targetlist, + root, jinfo.select_vars, rtoffset); + csplan->scan.plan.qual = + (List *) fixup_remote_join_expr((Node *)csplan->scan.plan.qual, + root, jinfo.select_vars, rtoffset); + + if (rtoffset > 0) + { + ListCell *lc; + + foreach (lc, jinfo.select_vars) + { + Var *var = lfirst(lc); + + var->varno += rtoffset; + } + } +} + +/* + * postgresBeginCustomScan + * + * Most of logic are equivalent to postgresBeginForeignScan, however, + * needs adjustment because of difference in the nature. + * The biggest one is, it has to open the underlying relation by itself + * and needs to construct tuple-descriptor from the var-list to be fetched, + * because custom-scan (in this case; a scan on remote join instead of + * local join) does not have a particular relation on its behaind, thus + * it needs to manage correctly. + */ +static void +postgresBeginCustomScan(CustomScanState *node, int eflags) +{ + CustomScan *csplan = (CustomScan *) node->ss.ps.plan; + EState *estate = node->ss.ps.state; + PgRemoteJoinInfo jinfo; + PgFdwScanState *fsstate; + TupleDesc tupdesc; + List *join_rels = NIL; + List *att_names = NIL; + List *att_types = NIL; + List *att_typmods = NIL; + List *att_collations = NIL; + List *retrieved_attrs = NIL; + ListCell *lc; + Oid userid; + int i; + + unpackPgRemoteJoinInfo(&jinfo, csplan->custom_private); + + /* + * ss_ScanTupleSlot of ScanState has to be correctly initialized + * even if this invocation is EXPLAIN (without ANALYZE), because + * Var node with CUSTOM_VAR references its TupleDesc to get + * virtual attribute name on the scanned slot. + */ + ExecInitScanTupleSlot(estate, &node->ss); + foreach (lc, jinfo.select_vars) + { + Oid reloid; + char *attname; + Var *var = lfirst(lc); + + Assert(IsA(var, Var)); + reloid = getrelid(var->varno, estate->es_range_table); + attname = get_relid_attribute_name(reloid, var->varattno); + + att_names = lappend(att_names, makeString(attname)); + att_types = lappend_oid(att_types, var->vartype); + att_typmods = lappend_int(att_typmods, var->vartypmod); + att_collations = lappend_oid(att_collations, var->varcollid); + + retrieved_attrs = lappend_int(retrieved_attrs, + list_length(retrieved_attrs) + 1); + } + tupdesc = BuildDescFromLists(att_names, att_types, + att_typmods, att_collations); + ExecAssignScanType(&node->ss, tupdesc); + + /* + * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. + */ + if (eflags & EXEC_FLAG_EXPLAIN_ONLY) + return; + + /* + * Needs to open underlying relations by itself + */ + while ((i = bms_first_member(jinfo.relids)) >= 0) + { + Relation rel = ExecOpenScanRelation(estate, i, eflags); + + join_rels = lappend(join_rels, rel); + } + + /* + * Determine a user-id. Current user-id shall be applied without something + * special configuration on the reference. + */ + userid = OidIsValid(jinfo.fdw_user_oid) ? jinfo.fdw_user_oid : GetUserId(); + + /* common part to begin remote query execution */ + fsstate = commonBeginForeignScan(&node->ss.ps, tupdesc, + jinfo.fdw_server_oid, userid, + jinfo.select_qry, + retrieved_attrs, + jinfo.select_params); + /* also, underlying relations also have to be saved */ + fsstate->join_rels = join_rels; + + node->custom_state = fsstate; +} + +/* + * postgresExecCustomAccess + * + * Access method to fetch a tuple from the remote join query. + * It performs equivalent job as postgresIterateForeignScan() doing on + * queries to single relation. + */ +static TupleTableSlot * +postgresExecCustomAccess(CustomScanState *node) +{ + PgFdwScanState *fsstate = node->custom_state; + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + + return commonIterateForeignScan(fsstate, &node->ss.ps, slot); +} + +/* + * postgresExecCustomRecheck + * + * No need to recheck it again. + */ +static bool +postgresExecCustomRecheck(CustomScanState *node, TupleTableSlot *slot) +{ + return true; +} + +/* + * postgresExecCustomScan + * + * Just a wrapper of regular ExecScan + */ +static TupleTableSlot * +postgresExecCustomScan(CustomScanState *node) +{ + return ExecScan((ScanState *) node, + (ExecScanAccessMtd) postgresExecCustomAccess, + (ExecScanRecheckMtd) postgresExecCustomRecheck); +} + +/* + * postgresEndCustomScan + * + * Nothing are different from postgresEndForeignScan, except for closing + * underlying relations by itself. + */ +static void +postgresEndCustomScan(CustomScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->custom_state; + ListCell *lc; + + /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ + if (fsstate == NULL) + return; + + /* cleanup resources used in common portion */ + commonEndForeignScan(fsstate); + + foreach (lc, fsstate->join_rels) + ExecCloseScanRelation(lfirst(lc)); +} + +/* + * postgresReScanCustomScan + * + * Same as postgresReScanForeignScan() doing. + */ +static void +postgresReScanCustomScan(CustomScanState *node) +{ + PgFdwScanState *fsstate = node->custom_state; + + commonReScanForeignScan(fsstate, &node->ss.ps); +} + +/* + * postgresExplainCustomScan + * + * Callback routine on EXPLAIN. It just adds remote query, if verbose mode. + */ +static void +postgresExplainCustomScan(CustomScanState *csstate, + ExplainState *es) +{ + if (es->verbose) + { + PgRemoteJoinInfo jinfo; + CustomScan *cscan = (CustomScan *)csstate->ss.ps.plan; + + unpackPgRemoteJoinInfo(&jinfo, cscan->custom_private); + + ExplainPropertyText("Remote SQL", jinfo.select_qry, es); + } +} + +/* + * _PG_init + * + * Entrypoint of this module; registration of custom-scan provider, but + * no special registration is not needed for FDW portion. + */ +void +_PG_init(void) +{ + CustomProvider provider; + + /* registration of hook on add_join_paths */ + add_join_path_next = add_join_path_hook; + add_join_path_hook = postgresAddJoinPaths; + + /* registration of custom scan provider */ + memset(&provider, 0, sizeof(provider)); + snprintf(provider.name, sizeof(provider.name), "postgres-fdw"); + provider.InitCustomScanPlan = postgresInitCustomScanPlan; + provider.SetPlanRefCustomScan = postgresSetPlanRefCustomScan; + provider.BeginCustomScan = postgresBeginCustomScan; + provider.ExecCustomScan = postgresExecCustomScan; + provider.EndCustomScan = postgresEndCustomScan; + provider.ReScanCustomScan = postgresReScanCustomScan; + provider.ExplainCustomScan = postgresExplainCustomScan; + + register_custom_provider(&provider); } diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index c782d4f..27486b9 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -21,6 +21,41 @@ #include "libpq-fe.h" /* in postgres_fdw.c */ + +/* + * FDW-specific planner information kept in RelOptInfo.fdw_private for a + * foreign table. This information is collected by postgresGetForeignRelSize. + */ +typedef struct PgFdwRelationInfo +{ + /* baserestrictinfo clauses, broken down into safe and unsafe subsets. */ + List *remote_conds; + List *local_conds; + + /* Bitmap of attr numbers we need to fetch from the remote server. */ + Bitmapset *attrs_used; + + /* Cost and selectivity of local_conds. */ + QualCost local_conds_cost; + Selectivity local_conds_sel; + + /* Estimated size and cost for a scan with baserestrictinfo quals. */ + double rows; + int width; + Cost startup_cost; + Cost total_cost; + + /* Options extracted from catalogs. */ + bool use_remote_estimate; + Cost fdw_startup_cost; + Cost fdw_tuple_cost; + + /* Cached catalog information. */ + ForeignTable *table; + ForeignServer *server; + UserMapping *user; /* only set in use_remote_estimate mode */ +} PgFdwRelationInfo; + extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); @@ -41,6 +76,7 @@ extern int ExtractConnectionOptions(List *defelems, /* in deparse.c */ extern void classifyConditions(PlannerInfo *root, RelOptInfo *baserel, + List *restrictinfo_list, List **remote_conds, List **local_conds); extern bool is_foreign_expr(PlannerInfo *root, @@ -56,6 +92,8 @@ extern void appendWhereClause(StringInfo buf, RelOptInfo *baserel, List *exprs, bool is_first, + bool is_join_on, + bool qualified, List **params); extern void deparseInsertSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, @@ -69,8 +107,34 @@ extern void deparseDeleteSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *returningList, List **retrieved_attrs); +extern void deparseRemoteJoinSql(StringInfo buf, PlannerInfo *root, + List *relinfo, + List *target_list, + List *local_conds, + List **select_vars, + List **param_list); extern void deparseAnalyzeSizeSql(StringInfo buf, Relation rel); extern void deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs); +/* remote join support on top of custom-scan APIs */ +typedef struct +{ + Oid fdw_server_oid; /* server oid commonly used */ + Oid fdw_user_oid; /* user oid commonly used */ + Relids relids; /* bitmapset of range table indexes */ + JoinType jointype; /* one of JOIN_* */ + Node *outer_rel; /* packed information of outer relation */ + Node *inner_rel; /* packed information of inner relation */ + List *remote_conds; /* condition to be run on remote server */ + List *local_conds; /* condition to be run on local server */ + List *select_vars; /* List of Var nodes to be fetched */ + List *select_params; /* List of Var nodes being parameralized */ + char *select_qry; /* remote query being deparsed */ +} PgRemoteJoinInfo; + +extern List *packPgRemoteJoinInfo(PgRemoteJoinInfo *jinfo); +extern void unpackPgRemoteJoinInfo(PgRemoteJoinInfo *jinfo, + List *custom_private); + #endif /* POSTGRES_FDW_H */ diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml index 35924f1..7926d54 100644 --- a/doc/src/sgml/postgres-fdw.sgml +++ b/doc/src/sgml/postgres-fdw.sgml @@ -350,6 +350,16 @@ + In addition, PostgreSQL 9.4 or later adaptively tries + to join relations, being managed by a same foreign server, on the remote + node if supplied join condition is sufficient to run on the remote side. + It performs as if a local custom scan node walks on a virtual relation + being consists of multiple relations according to remote join, thus + it usually has cheaper cost than data translation of both relations and + local join operations. + + + The query that is actually sent to the remote server for execution can be examined using EXPLAIN VERBOSE. diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c index 2b75f73..2efa17b 100644 --- a/src/backend/foreign/foreign.c +++ b/src/backend/foreign/foreign.c @@ -23,6 +23,7 @@ #include "lib/stringinfo.h" #include "miscadmin.h" #include "utils/builtins.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h" #include "utils/syscache.h" @@ -621,3 +622,31 @@ get_foreign_server_oid(const char *servername, bool missing_ok) errmsg("server \"%s\" does not exist", servername))); return oid; } + +/* + * is_fdw_managed_relation + * + * It checks whether the supplied relation is a foreign table managed + * by the module that has FdwRoutine, or not. + */ +bool +is_fdw_managed_relation(Oid tableoid, const FdwRoutine *routines_self) +{ + FdwRoutine *routines; + char relkind = get_rel_relkind(tableoid); + + if (relkind == RELKIND_FOREIGN_TABLE) + { + routines = GetFdwRoutineByRelId(tableoid); + + /* + * Our assumption is a particular callback being implemented by + * a particular extension shall not be shared with other extension. + * So, we don't need to compare all the function pointers in the + * FdwRoutine, but only one member. + */ + if (routines->GetForeignRelSize == routines_self->GetForeignRelSize) + return true; + } + return false; +} diff --git a/src/backend/nodes/bitmapset.c b/src/backend/nodes/bitmapset.c index 540db16..44f2236 100644 --- a/src/backend/nodes/bitmapset.c +++ b/src/backend/nodes/bitmapset.c @@ -865,3 +865,65 @@ bms_hash_value(const Bitmapset *a) return DatumGetUInt32(hash_any((const unsigned char *) a->words, (lastword + 1) * sizeof(bitmapword))); } + +/* + * bms_to_string / bms_from_string - transform bitmapset to/from text + * representation for portability purpose. + */ +char * +bms_to_string(Bitmapset *a) +{ + char *result; + char *pos; + int i; + + if (bms_is_empty(a)) + return NULL; + + result = palloc(a->nwords * (BITS_PER_BITMAPWORD / 4) + 1); + for (i = a->nwords, pos = result; i > 0; i--) + pos += sprintf(pos, "%08x", a->words[i - 1]); + + return result; +} + +Bitmapset * +bms_from_string(const char *a) +{ + Bitmapset *result; + Size len; + int nwords; + int i, offset = 0; + + if (a == NULL) + return NULL; + + len = strlen(a); + if (len % (BITS_PER_BITMAPWORD / 4) != 0) + elog(WARNING, "strange bitmapset text representation: %s", a); + + nwords = (len + BITS_PER_BITMAPWORD / 4 - 1) / (BITS_PER_BITMAPWORD / 4); + result = palloc(BITMAPSET_SIZE(nwords)); + result->nwords = nwords; + + for (i=result->nwords; i > 0; i--) + { + bitmapword word = 0; + + do { + int c = a[offset++]; + if (c >= '0' && c <= '9') + word = (word << 4) | (c - '0'); + else if (c >= 'a' && c <= 'f') + word = (word << 4) | (c - 'a'); + else if (c >= 'A' && c <= 'F') + word = (word << 4) | (c - 'A'); + else + elog(ERROR, "invalid hexadecimal digit"); + } while ((len - offset) % (BITS_PER_BITMAPWORD / 4) != 0); + + result->words[i - 1] = word; + } + + return result; +} diff --git a/src/backend/optimizer/util/var.c b/src/backend/optimizer/util/var.c index 4a3d5c8..6e899e8 100644 --- a/src/backend/optimizer/util/var.c +++ b/src/backend/optimizer/util/var.c @@ -73,6 +73,7 @@ static bool pull_varattnos_walker(Node *node, pull_varattnos_context *context); static bool pull_vars_walker(Node *node, pull_vars_context *context); static bool contain_var_clause_walker(Node *node, void *context); static bool contain_vars_of_level_walker(Node *node, int *sublevels_up); +static bool contain_wholerow_reference_walker(Node *node, void *context); static bool locate_var_of_level_walker(Node *node, locate_var_of_level_context *context); static bool pull_var_clause_walker(Node *node, @@ -418,6 +419,44 @@ contain_vars_of_level_walker(Node *node, int *sublevels_up) (void *) sublevels_up); } +/* + * contain_wholerow_reference + * + * Recursively scan a clause to discover whether it contains any Var nodes + * of whole-row reference in the current query level. + * + * Returns true if any such Var found. + */ +bool +contain_wholerow_reference(Node *node) +{ + return contain_wholerow_reference_walker(node, NULL); +} + +static bool +contain_wholerow_reference_walker(Node *node, void *context) +{ + if (node == NULL) + return false; + if (IsA(node, RestrictInfo)) + { + RestrictInfo *rinfo = (RestrictInfo *) node; + + return contain_wholerow_reference_walker((Node *)rinfo->clause, + context); + } + if (IsA(node, Var)) + { + Var *var = (Var *) node; + + if (var->varlevelsup == 0 && var->varattno == 0) + return true; + return false; + } + return expression_tree_walker(node, + contain_wholerow_reference_walker, + context); +} /* * locate_var_of_level diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h index 5bd6ae6..9514f5f 100644 --- a/src/include/foreign/foreign.h +++ b/src/include/foreign/foreign.h @@ -13,6 +13,7 @@ #ifndef FOREIGN_H #define FOREIGN_H +#include "foreign/fdwapi.h" #include "nodes/parsenodes.h" @@ -81,4 +82,7 @@ extern List *GetForeignColumnOptions(Oid relid, AttrNumber attnum); extern Oid get_foreign_data_wrapper_oid(const char *fdwname, bool missing_ok); extern Oid get_foreign_server_oid(const char *servername, bool missing_ok); +extern bool is_fdw_managed_relation(Oid tableoid, + const FdwRoutine *routines_self); + #endif /* FOREIGN_H */ diff --git a/src/include/nodes/bitmapset.h b/src/include/nodes/bitmapset.h index 2a4b41d..73424f5 100644 --- a/src/include/nodes/bitmapset.h +++ b/src/include/nodes/bitmapset.h @@ -93,4 +93,8 @@ extern int bms_first_member(Bitmapset *a); /* support for hashtables using Bitmapsets as keys: */ extern uint32 bms_hash_value(const Bitmapset *a); +/* support for string representation */ +extern char *bms_to_string(Bitmapset *a); +extern Bitmapset *bms_from_string(const char *a); + #endif /* BITMAPSET_H */ diff --git a/src/include/optimizer/var.h b/src/include/optimizer/var.h index 808bf67..6355b4d 100644 --- a/src/include/optimizer/var.h +++ b/src/include/optimizer/var.h @@ -36,6 +36,7 @@ extern void pull_varattnos(Node *node, Index varno, Bitmapset **varattnos); extern List *pull_vars_of_level(Node *node, int levelsup); extern bool contain_var_clause(Node *node); extern bool contain_vars_of_level(Node *node, int levelsup); +extern bool contain_wholerow_reference(Node *node); extern int locate_var_of_level(Node *node, int levelsup); extern List *pull_var_clause(Node *node, PVCAggregateBehavior aggbehavior, PVCPlaceHolderBehavior phbehavior);