*** a/src/backend/catalog/pg_proc.c --- b/src/backend/catalog/pg_proc.c *************** *** 755,761 **** fmgr_sql_validator(PG_FUNCTION_ARGS) --- 755,763 ---- Oid funcoid = PG_GETARG_OID(0); HeapTuple tuple; Form_pg_proc proc; + List *raw_parsetree_list; List *querytree_list; + ListCell *list_item; bool isnull; Datum tmp; char *prosrc; *************** *** 828,836 **** fmgr_sql_validator(PG_FUNCTION_ARGS) */ if (!haspolyarg) { ! querytree_list = pg_parse_and_rewrite(prosrc, ! proc->proargtypes.values, ! proc->pronargs); (void) check_sql_fn_retval(funcoid, proc->prorettype, querytree_list, NULL, NULL); --- 830,854 ---- */ if (!haspolyarg) { ! /* ! * Parse and rewrite the queries in the function text. ! * ! * Even though check_sql_fn_retval is only interested in the last ! * query, we analyze all of them here to check for any errors. ! */ ! raw_parsetree_list = pg_parse_query(prosrc); ! ! querytree_list = NIL; ! foreach(list_item, raw_parsetree_list) ! { ! Node *parsetree = (Node *) lfirst(list_item); ! ! querytree_list = pg_analyze_and_rewrite(parsetree, prosrc, ! proc->proargtypes.values, proc->pronargs); ! } ! ! Assert(querytree_list != NIL); ! (void) check_sql_fn_retval(funcoid, proc->prorettype, querytree_list, NULL, NULL); *** a/src/backend/executor/functions.c --- b/src/backend/executor/functions.c *************** *** 90,107 **** typedef struct ParamListInfo paramLI; /* Param list representing current args */ Tuplestorestate *tstore; /* where we accumulate result tuples */ JunkFilter *junkFilter; /* will be NULL if function returns VOID */ ! /* head of linked list of execution_state records */ ! execution_state *func_state; } SQLFunctionCache; typedef SQLFunctionCache *SQLFunctionCachePtr; /* non-export function prototypes */ ! static execution_state *init_execution_state(List *queryTree_list, SQLFunctionCachePtr fcache, bool lazyEvalOK); static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK); --- 90,107 ---- ParamListInfo paramLI; /* Param list representing current args */ Tuplestorestate *tstore; /* where we accumulate result tuples */ + Snapshot snapshot; JunkFilter *junkFilter; /* will be NULL if function returns VOID */ ! List *func_state; } SQLFunctionCache; typedef SQLFunctionCache *SQLFunctionCachePtr; /* non-export function prototypes */ ! static List *init_execution_state(List *queryTree_list, SQLFunctionCachePtr fcache, bool lazyEvalOK); static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK); *************** *** 123,183 **** static void sqlfunction_destroy(DestReceiver *self); /* Set up the list of per-query execution_state records for a SQL function */ ! static execution_state * init_execution_state(List *queryTree_list, SQLFunctionCachePtr fcache, bool lazyEvalOK) { ! execution_state *firstes = NULL; ! execution_state *preves = NULL; execution_state *lasttages = NULL; ! ListCell *qtl_item; ! foreach(qtl_item, queryTree_list) { ! Query *queryTree = (Query *) lfirst(qtl_item); ! Node *stmt; ! execution_state *newes; ! ! Assert(IsA(queryTree, Query)); ! ! if (queryTree->commandType == CMD_UTILITY) ! stmt = queryTree->utilityStmt; ! else ! stmt = (Node *) pg_plan_query(queryTree, 0, NULL); ! ! /* Precheck all commands for validity in a function */ ! if (IsA(stmt, TransactionStmt)) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! /* translator: %s is a SQL statement name */ ! errmsg("%s is not allowed in a SQL function", ! CreateCommandTag(stmt)))); ! ! if (fcache->readonly_func && !CommandIsReadOnly(stmt)) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! /* translator: %s is a SQL statement name */ ! errmsg("%s is not allowed in a non-volatile function", ! CreateCommandTag(stmt)))); ! ! newes = (execution_state *) palloc(sizeof(execution_state)); ! if (preves) ! preves->next = newes; ! else ! firstes = newes; ! ! newes->next = NULL; ! newes->status = F_EXEC_START; ! newes->setsResult = false; /* might change below */ ! newes->lazyEval = false; /* might change below */ ! newes->stmt = stmt; ! newes->qd = NULL; ! ! if (queryTree->canSetTag) ! lasttages = newes; ! ! preves = newes; } /* --- 123,200 ---- /* Set up the list of per-query execution_state records for a SQL function */ ! static List * init_execution_state(List *queryTree_list, SQLFunctionCachePtr fcache, bool lazyEvalOK) { ! execution_state *firstes; ! execution_state *preves; execution_state *lasttages = NULL; ! List *eslist; ! ListCell *lc1; ! ListCell *lc2; ! List *qtlist; ! Query *queryTree; ! ! eslist = NIL; ! ! foreach(lc1, queryTree_list) { ! qtlist = (List *) lfirst(lc1); ! firstes = NULL; ! preves = NULL; ! ! foreach(lc2, qtlist) ! { ! Node *stmt; ! execution_state *newes; ! ! queryTree = (Query *) lfirst(lc2); ! ! Assert(IsA(queryTree, Query)); ! ! if (queryTree->commandType == CMD_UTILITY) ! stmt = queryTree->utilityStmt; ! else ! stmt = (Node *) pg_plan_query(queryTree, 0, NULL); ! ! /* Precheck all commands for validity in a function */ ! if (IsA(stmt, TransactionStmt)) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! /* translator: %s is a SQL statement name */ ! errmsg("%s is not allowed in a SQL function", ! CreateCommandTag(stmt)))); ! ! if (fcache->readonly_func && !CommandIsReadOnly(stmt)) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! /* translator: %s is a SQL statement name */ ! errmsg("%s is not allowed in a non-volatile function", ! CreateCommandTag(stmt)))); ! ! newes = (execution_state *) palloc(sizeof(execution_state)); ! if (preves) ! preves->next = newes; ! else ! firstes = newes; ! ! newes->next = NULL; ! newes->status = F_EXEC_START; ! newes->setsResult = false; /* might change below */ ! newes->lazyEval = false; /* might change below */ ! newes->stmt = stmt; ! newes->qd = NULL; ! ! if (queryTree->canSetTag) ! lasttages = newes; ! ! preves = newes; ! } ! ! eslist = lappend(eslist, firstes); } /* *************** *** 210,216 **** init_execution_state(List *queryTree_list, } } ! return firstes; } /* Initialize the SQLFunctionCache for a SQL function */ --- 227,233 ---- } } ! return eslist; } /* Initialize the SQLFunctionCache for a SQL function */ *************** *** 224,230 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK) --- 241,249 ---- SQLFunctionCachePtr fcache; Oid *argOidVect; int nargs; + List *raw_parsetree_list; List *queryTree_list; + ListCell *list_item; Datum tmp; bool isNull; *************** *** 319,325 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK) /* * Parse and rewrite the queries in the function text. */ ! queryTree_list = pg_parse_and_rewrite(fcache->src, argOidVect, nargs); /* * Check that the function returns the type it claims to. Although in --- 338,356 ---- /* * Parse and rewrite the queries in the function text. */ ! raw_parsetree_list = pg_parse_query(fcache->src); ! ! queryTree_list = NIL; ! foreach(list_item, raw_parsetree_list) ! { ! Node *parsetree = (Node *) lfirst(list_item); ! ! queryTree_list = lappend(queryTree_list, ! pg_analyze_and_rewrite(parsetree, ! fcache->src, ! argOidVect, ! nargs)); ! } /* * Check that the function returns the type it claims to. Although in *************** *** 342,348 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK) */ fcache->returnsTuple = check_sql_fn_retval(foid, rettype, ! queryTree_list, NULL, &fcache->junkFilter); --- 373,379 ---- */ fcache->returnsTuple = check_sql_fn_retval(foid, rettype, ! llast(queryTree_list), NULL, &fcache->junkFilter); *************** *** 374,397 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK) static void postquel_start(execution_state *es, SQLFunctionCachePtr fcache) { - Snapshot snapshot; DestReceiver *dest; Assert(es->qd == NULL); ! /* ! * In a read-only function, use the surrounding query's snapshot; ! * otherwise take a new snapshot for each query. The snapshot should ! * include a fresh command ID so that all work to date in this transaction ! * is visible. ! */ ! if (fcache->readonly_func) ! snapshot = GetActiveSnapshot(); ! else ! { ! CommandCounterIncrement(); ! snapshot = GetTransactionSnapshot(); ! } /* * If this query produces the function result, send its output to the --- 405,415 ---- static void postquel_start(execution_state *es, SQLFunctionCachePtr fcache) { DestReceiver *dest; Assert(es->qd == NULL); ! Assert(ActiveSnapshotSet()); /* * If this query produces the function result, send its output to the *************** *** 415,427 **** postquel_start(execution_state *es, SQLFunctionCachePtr fcache) if (IsA(es->stmt, PlannedStmt)) es->qd = CreateQueryDesc((PlannedStmt *) es->stmt, fcache->src, ! snapshot, InvalidSnapshot, dest, fcache->paramLI, 0); else es->qd = CreateUtilityQueryDesc(es->stmt, fcache->src, ! snapshot, dest, fcache->paramLI); --- 433,446 ---- if (IsA(es->stmt, PlannedStmt)) es->qd = CreateQueryDesc((PlannedStmt *) es->stmt, fcache->src, ! GetActiveSnapshot(), ! InvalidSnapshot, dest, fcache->paramLI, 0); else es->qd = CreateUtilityQueryDesc(es->stmt, fcache->src, ! GetActiveSnapshot(), dest, fcache->paramLI); *************** *** 617,622 **** fmgr_sql(PG_FUNCTION_ARGS) --- 636,643 ---- execution_state *es; TupleTableSlot *slot; Datum result; + List *eslist; + ListCell *eslc; /* * Switch to context in which the fcache lives. This ensures that *************** *** 668,680 **** fmgr_sql(PG_FUNCTION_ARGS) init_sql_fcache(fcinfo->flinfo, lazyEvalOK); fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra; } ! es = fcache->func_state; /* * Convert params to appropriate format if starting a fresh execution. (If * continuing execution, we can re-use prior params.) */ ! if (es && es->status == F_EXEC_START) postquel_sub_params(fcache, fcinfo); /* --- 689,701 ---- init_sql_fcache(fcinfo->flinfo, lazyEvalOK); fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra; } ! eslist = fcache->func_state; /* * Convert params to appropriate format if starting a fresh execution. (If * continuing execution, we can re-use prior params.) */ ! if (linitial(eslist) && ((execution_state *) linitial(eslist))->status == F_EXEC_START) postquel_sub_params(fcache, fcinfo); /* *************** *** 687,694 **** fmgr_sql(PG_FUNCTION_ARGS) /* * Find first unfinished query in function. */ ! while (es && es->status == F_EXEC_DONE) ! es = es->next; /* * Execute each command in the function one after another until we either --- 708,725 ---- /* * Find first unfinished query in function. */ ! ! es = NULL; /* keep compiler quiet */ ! foreach(eslc, eslist) ! { ! es = (execution_state *) lfirst(eslc); ! ! while (es && es->status == F_EXEC_DONE) ! es = es->next; ! ! if (es) ! break; ! } /* * Execute each command in the function one after another until we either *************** *** 699,706 **** fmgr_sql(PG_FUNCTION_ARGS) --- 730,760 ---- bool completed; if (es->status == F_EXEC_START) + { + if (!fcache->readonly_func) + { + /* + * In a read-only function, use the surrounding query's snapshot; + * otherwise take a new snapshot if we don't have one yet. The + * snapshot should include a fresh command ID so that all work to + * date in this transaction is visible. + */ + if (!fcache->snapshot) + { + CommandCounterIncrement(); + fcache->snapshot = RegisterSnapshot(GetTransactionSnapshot()); + PushActiveSnapshot(fcache->snapshot); + } + else + PushUpdatedSnapshot(fcache->snapshot); + } + postquel_start(es, fcache); + if (!fcache->readonly_func) + PopActiveSnapshot(); + } + completed = postquel_getnext(es, fcache); /* *************** *** 726,731 **** fmgr_sql(PG_FUNCTION_ARGS) --- 780,804 ---- if (es->status != F_EXEC_DONE) break; es = es->next; + + if (!es) + { + eslc = lnext(eslc); + if (!eslc) + break; + + es = (execution_state *) lfirst(eslc); + + /* make sure we take a new snapshot for this query list */ + if (!fcache->readonly_func) + { + Assert(fcache->snapshot != InvalidSnapshot); + UnregisterSnapshot(fcache->snapshot); + fcache->snapshot = InvalidSnapshot; + } + else + Assert(fcache->snapshot == InvalidSnapshot); + } } /* *************** *** 794,799 **** fmgr_sql(PG_FUNCTION_ARGS) --- 867,877 ---- PointerGetDatum(fcache)); fcache->shutdown_reg = false; } + + /* Unregister snapshot if we have one */ + if (fcache->snapshot != InvalidSnapshot) + UnregisterSnapshot(fcache->snapshot); + fcache->snapshot = InvalidSnapshot; } else { *************** *** 820,825 **** fmgr_sql(PG_FUNCTION_ARGS) --- 898,908 ---- PointerGetDatum(fcache)); fcache->shutdown_reg = false; } + + /* Unregister snapshot if we have one */ + if (fcache->snapshot != InvalidSnapshot) + UnregisterSnapshot(fcache->snapshot); + fcache->snapshot = InvalidSnapshot; } } else *************** *** 850,855 **** fmgr_sql(PG_FUNCTION_ARGS) --- 933,943 ---- /* Clear the tuplestore, but keep it for next time */ tuplestore_clear(fcache->tstore); + + /* Unregister snapshot if we have one */ + if (fcache->snapshot != InvalidSnapshot) + UnregisterSnapshot(fcache->snapshot); + fcache->snapshot = InvalidSnapshot; } /* *************** *** 858,868 **** fmgr_sql(PG_FUNCTION_ARGS) */ if (es == NULL) { ! es = fcache->func_state; ! while (es) { ! es->status = F_EXEC_START; ! es = es->next; } } --- 946,959 ---- */ if (es == NULL) { ! foreach(eslc, fcache->func_state) { ! es = (execution_state *) lfirst(eslc); ! while (es) ! { ! es->status = F_EXEC_START; ! es = es->next; ! } } } *************** *** 913,931 **** sql_exec_error_callback(void *arg) { execution_state *es; int query_num; ! es = fcache->func_state; query_num = 1; ! while (es) { ! if (es->qd) { ! errcontext("SQL function \"%s\" statement %d", ! fcache->fname, query_num); ! break; } - es = es->next; - query_num++; } if (es == NULL) { --- 1004,1027 ---- { execution_state *es; int query_num; + ListCell *lc; ! es = NULL; /* keep compiler quiet */ query_num = 1; ! foreach(lc, fcache->func_state) { ! es = (execution_state *) lfirst(lc); ! while (es) { ! if (es->qd) ! { ! errcontext("SQL function \"%s\" statement %d", ! fcache->fname, query_num); ! break; ! } ! es = es->next; ! query_num++; } } if (es == NULL) { *************** *** 956,973 **** static void ShutdownSQLFunction(Datum arg) { SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg); ! execution_state *es = fcache->func_state; ! while (es != NULL) { ! /* Shut down anything still running */ ! if (es->status == F_EXEC_RUN) ! postquel_end(es); ! /* Reset states to START in case we're called again */ ! es->status = F_EXEC_START; ! es = es->next; } /* Release tuplestore if we have one */ if (fcache->tstore) tuplestore_end(fcache->tstore); --- 1052,1080 ---- ShutdownSQLFunction(Datum arg) { SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg); ! execution_state *es; ! ListCell *lc; ! foreach(lc, fcache->func_state) { ! es = (execution_state *) lfirst(lc); ! ! while (es) ! { ! /* Shut down anything still running */ ! if (es->status == F_EXEC_RUN) ! postquel_end(es); ! /* Reset states to START in case we're called again */ ! es->status = F_EXEC_START; ! es = es->next; ! } } + /* Unregister snapshot if we have one */ + if (fcache->snapshot != InvalidSnapshot) + UnregisterSnapshot(fcache->snapshot); + fcache->snapshot = InvalidSnapshot; + /* Release tuplestore if we have one */ if (fcache->tstore) tuplestore_end(fcache->tstore); *** a/src/backend/executor/spi.c --- b/src/backend/executor/spi.c *************** *** 1769,1774 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, --- 1769,1775 ---- SPITupleTable *my_tuptable = NULL; int res = 0; bool have_active_snap = ActiveSnapshotSet(); + bool registered_snap = false; ErrorContextCallback spierrcontext; CachedPlan *cplan = NULL; ListCell *lc1; *************** *** 1872,1879 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, } else { ! PushActiveSnapshot(GetTransactionSnapshot()); pushed_active_snap = true; } } else --- 1873,1882 ---- } else { ! snapshot = RegisterSnapshot(GetTransactionSnapshot()); ! PushActiveSnapshot(snapshot); pushed_active_snap = true; + registered_snap = true; } } else *************** *** 1966,1975 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, --- 1969,1991 ---- */ if (!read_only) CommandCounterIncrement(); + + /* + * If we took a new snapshot for this query list, unregister it and + * make sure we take a new one for the next list. + */ + if (registered_snap) + { + UnregisterSnapshot(snapshot); + snapshot = InvalidSnapshot; + } } fail: + if (registered_snap) + UnregisterSnapshot(snapshot); + /* We no longer need the cached plan refcount, if any */ if (cplan) ReleaseCachedPlan(cplan, true); *** a/src/backend/tcop/postgres.c --- b/src/backend/tcop/postgres.c *************** *** 505,552 **** client_read_ended(void) } } - - /* - * Parse a query string and pass it through the rewriter. - * - * A list of Query nodes is returned, since the string might contain - * multiple queries and/or the rewriter might expand one query to several. - * - * NOTE: this routine is no longer used for processing interactive queries, - * but it is still needed for parsing of SQL function bodies. - */ - List * - pg_parse_and_rewrite(const char *query_string, /* string to execute */ - Oid *paramTypes, /* parameter types */ - int numParams) /* number of parameters */ - { - List *raw_parsetree_list; - List *querytree_list; - ListCell *list_item; - - /* - * (1) parse the request string into a list of raw parse trees. - */ - raw_parsetree_list = pg_parse_query(query_string); - - /* - * (2) Do parse analysis and rule rewrite. - */ - querytree_list = NIL; - foreach(list_item, raw_parsetree_list) - { - Node *parsetree = (Node *) lfirst(list_item); - - querytree_list = list_concat(querytree_list, - pg_analyze_and_rewrite(parsetree, - query_string, - paramTypes, - numParams)); - } - - return querytree_list; - } - /* * Do raw parsing (only). * --- 505,510 ---- *** a/src/backend/tcop/pquery.c --- b/src/backend/tcop/pquery.c *************** *** 170,180 **** ProcessQuery(PlannedStmt *plan, elog(DEBUG3, "ProcessQuery"); /* - * Must always set a snapshot for plannable queries. - */ - PushActiveSnapshot(GetTransactionSnapshot()); - - /* * Create the QueryDesc object */ queryDesc = CreateQueryDesc(plan, sourceText, --- 170,175 ---- *************** *** 234,241 **** ProcessQuery(PlannedStmt *plan, /* Now take care of any queued AFTER triggers */ AfterTriggerEndQuery(queryDesc->estate); - PopActiveSnapshot(); - /* * Now, we close down all the scans and free allocated resources. */ --- 229,234 ---- *************** *** 1220,1225 **** PortalRunMulti(Portal portal, bool isTopLevel, --- 1213,1219 ---- char *completionTag) { ListCell *stmtlist_item; + Snapshot snapshot = InvalidSnapshot; /* * If the destination is DestRemoteExecute, change to DestNone. The *************** *** 1262,1267 **** PortalRunMulti(Portal portal, bool isTopLevel, --- 1256,1270 ---- if (log_executor_stats) ResetUsage(); + /* if no snapshot is set, grab a new one and register it */ + if (snapshot == InvalidSnapshot) + { + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + PushActiveSnapshot(snapshot); + } + else + PushUpdatedSnapshot(snapshot); + if (pstmt->canSetTag) { /* statement can set tag string */ *************** *** 1279,1284 **** PortalRunMulti(Portal portal, bool isTopLevel, --- 1282,1289 ---- altdest, NULL); } + PopActiveSnapshot(); + if (log_executor_stats) ShowUsage("EXECUTOR STATISTICS"); *************** *** 1291,1301 **** PortalRunMulti(Portal portal, bool isTopLevel, --- 1296,1320 ---- * * These are assumed canSetTag if they're the only stmt in the * portal. + * + * NotifyStmt is the only utility statement allowed in a list of + * rewritten queries, and it doesn't need a snapshot so we don't + * need to worry about it. However, if the list has only one + * statement and it's a utility statement, we are not allowed to + * take a snapshot. See the first comment in PortalRunUtility(). */ if (list_length(portal->stmts) == 1) + { + Assert(snapshot == InvalidSnapshot); + PortalRunUtility(portal, stmt, isTopLevel, dest, completionTag); + } else + { + Assert(IsA(stmt, NotifyStmt)); + PortalRunUtility(portal, stmt, isTopLevel, altdest, NULL); + } } /* *************** *** 1313,1318 **** PortalRunMulti(Portal portal, bool isTopLevel, --- 1332,1340 ---- MemoryContextDeleteChildren(PortalGetHeapMemory(portal)); } + if (snapshot != InvalidSnapshot) + UnregisterSnapshot(snapshot); + /* * If a command completion tag was supplied, use it. Otherwise use the * portal's commandTag as the default completion tag. *** a/src/include/tcop/tcopprot.h --- b/src/include/tcop/tcopprot.h *************** *** 45,52 **** typedef enum extern int log_statement; - extern List *pg_parse_and_rewrite(const char *query_string, - Oid *paramTypes, int numParams); extern List *pg_parse_query(const char *query_string); extern List *pg_analyze_and_rewrite(Node *parsetree, const char *query_string, Oid *paramTypes, int numParams); --- 45,50 ----