Skip site navigation (1) Skip section navigation (2)

Peripheral Links

Header And Logo

PostgreSQL
| The world's most advanced open source database.

Site Navigation

Search archives
  Advanced Search

Updatable views


  • From: Alvaro Herrera <alvherre(at)commandprompt(dot)com>
  • To: Patches <pgsql-patches(at)postgresql(dot)org>
  • Subject: Updatable views
  • Date: Mon, 21 Aug 2006 02:07:41 -0400
  • Message-id: <20060821060741.GF1130@alvh.no-ip.org> <text/plain>

Hi,

This is the patch for updatable views I've been able to come up with.  A
nasty bug was just discovered in the upcoming Mammoth Replicator release
so I'm not sure if I'm going to have time to work more on it soon.

So, I'll appreciate if somebody else takes the responsability to fix the
remaining issues.  I've put a lot of XXX's and some FIXME's.  Some
functions are in need of some comments as well.

The new files are src/backend/rewrite/viewUpdate.c and
src/include/rewrite/viewUpdate.h.  The third file, upd-views.sql, is
intended to be a new regression test.  Extra points if the table therein
is completed correctly.

I haven't tested the array stuff at all.

Comments from Bernd and Jaime are especially welcome if I've broken
something that used to work on their patch :-)

-- 
Alvaro Herrera                                http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.
Index: doc/src/sgml/catalogs.sgml
===================================================================
RCS file: /home/alvherre/cvs/pgsql/doc/src/sgml/catalogs.sgml,v
retrieving revision 2.129
diff -c -p -r2.129 catalogs.sgml
*** doc/src/sgml/catalogs.sgml	31 Jul 2006 20:08:55 -0000	2.129
--- doc/src/sgml/catalogs.sgml	17 Aug 2006 03:34:45 -0000
***************
*** 3502,3507 ****
--- 3502,3519 ----
       </row>
  
       <row>
+       <entry><structfield>ev_kind</structfield></entry>
+       <entry><type>char</type></entry>
+       <entry></entry>
+       <entry>  
+        <literal>l</> = with local check option,   
+        <literal>c</> = with cascaded check option,
+        <literal>n</> = no check option specified,
+        <literal>e</> = rule was created by user 
+       </entry>
+      </row>
+ 
+      <row>
        <entry><structfield>ev_qual</structfield></entry>
        <entry><type>text</type></entry>
        <entry></entry>
Index: doc/src/sgml/information_schema.sgml
===================================================================
RCS file: /home/alvherre/cvs/pgsql/doc/src/sgml/information_schema.sgml,v
retrieving revision 1.26
diff -c -p -r1.26 information_schema.sgml
*** doc/src/sgml/information_schema.sgml	2 May 2006 18:07:51 -0000	1.26
--- doc/src/sgml/information_schema.sgml	17 Aug 2006 03:34:45 -0000
*************** ORDER BY c.ordinal_position;
*** 5085,5091 ****
       <row>
        <entry><literal>check_option</literal></entry>
        <entry><type>character_data</type></entry>
!       <entry>Applies to a feature not available in <productname>PostgreSQL</></entry>
       </row>
  
       <row>
--- 5085,5095 ----
       <row>
        <entry><literal>check_option</literal></entry>
        <entry><type>character_data</type></entry>
!       <entry>
!        The level of integrity checking in updatable views,
!        either <literal>LOCAL</literal>, <literal>CASCADED</literal>
!        or <literal>NONE</literal>
!      </entry>
       </row>
  
       <row>
Index: doc/src/sgml/intro.sgml
===================================================================
RCS file: /home/alvherre/cvs/pgsql/doc/src/sgml/intro.sgml,v
retrieving revision 1.31
diff -c -p -r1.31 intro.sgml
*** doc/src/sgml/intro.sgml	10 Mar 2006 19:10:48 -0000	1.31
--- doc/src/sgml/intro.sgml	17 Aug 2006 03:34:45 -0000
***************
*** 110,116 ****
       <simpara>triggers</simpara>
      </listitem>
      <listitem>
!      <simpara>views</simpara>
      </listitem>
      <listitem>
       <simpara>transactional integrity</simpara>
--- 110,116 ----
       <simpara>triggers</simpara>
      </listitem>
      <listitem>
!      <simpara>updatable views</simpara>
      </listitem>
      <listitem>
       <simpara>transactional integrity</simpara>
Index: doc/src/sgml/ref/create_view.sgml
===================================================================
RCS file: /home/alvherre/cvs/pgsql/doc/src/sgml/ref/create_view.sgml,v
retrieving revision 1.31
diff -c -p -r1.31 create_view.sgml
*** doc/src/sgml/ref/create_view.sgml	1 Nov 2005 21:09:50 -0000	1.31
--- doc/src/sgml/ref/create_view.sgml	17 Aug 2006 03:34:45 -0000
*************** PostgreSQL documentation
*** 22,27 ****
--- 22,28 ----
  <synopsis>
  CREATE [ OR REPLACE ] [ TEMP | TEMPORARY ] VIEW <replaceable class="PARAMETER">name</replaceable> [ ( <replaceable class="PARAMETER">column_name</replaceable> [, ...] ) ]
      AS <replaceable class="PARAMETER">query</replaceable>
+     [ WITH [ CASCADED | LOCAL ] CHECK OPTION ]
  </synopsis>
   </refsynopsisdiv>
  
*************** CREATE [ OR REPLACE ] [ TEMP | TEMPORARY
*** 109,114 ****
--- 110,150 ----
       </para>
      </listitem>
     </varlistentry>
+ 
+    <variablelist>
+      <varlistentry>
+       <term><literal>CHECK OPTION</literal></term>
+       <listitem>
+        <para>
+         This option has to do with updatable views.  All
+         <command>INSERT</> and <command>UPDATE</> commands on the view
+         will be checked to ensure data satisfy the view-defining
+         condition (that is, the new data would be visible through the
+         view).  If they do not, the update will be rejected.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
+       <term><literal>LOCAL</literal></term>
+       <listitem>
+        <para>
+         Check for integrity on this view.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
+       <term><literal>CASCADED</literal></term>
+       <listitem>
+        <para>
+         Check for integrity on this view and on any dependent
+         view.  <literal>CASCADED</> is assumed if neither
+         <literal>CASCADED</> nor <literal>LOCAL</> is specified.
+        </para>
+       </listitem>
+      </varlistentry>
+    
    </variablelist>
   </refsect1>
  
*************** CREATE [ OR REPLACE ] [ TEMP | TEMPORARY
*** 116,126 ****
    <title>Notes</title>
  
     <para>
!     Currently, views are read only: the system will not allow an insert,
!     update, or delete on a view.  You can get the effect of an updatable
!     view by creating rules that rewrite inserts, etc. on the view into
!     appropriate actions on other tables.  For more information see
!     <xref linkend="sql-createrule" endterm="sql-createrule-title">.
     </para>
  
     <para>
--- 152,201 ----
    <title>Notes</title>
  
     <para>
!     Currently, views are updatable following SQL92 specifications, that
!     is: 
!    </para>
! 
!    <itemizedlist>
!     <listitem>
!      <para>
!       Views with just one base table (or another updatable view)
!      </para>
!     </listitem>
! 
!     <listitem>
!      <para>No aggregate functions</para>
!     </listitem>
! 
!     <listitem>
!      <para>
!       No <literal>HAVING</literal>, <literal>DISTINCT</literal> nor
!       <literal>GROUP BY</literal> clauses
!      </para>
!     </listitem>
! 
!     <listitem>
!      <para>
!       No <literal>UNION</literal>, <literal>INTERSECT</literal>,
!       <literal>EXCEPT</literal>clauses
!      </para>
!     </listitem>
!    </itemizedlist>
! 
!    <para>
!     Views are insertable only if you provide in the defining
!     <command>SELECT</command> statement all columns that are NOT NULL and don't
!     have a default value.
!    </para>
! 
!    <para>
!     The updatable views implementation is based on the rule system.  Because of
!     this, you can get the same effect in more complex views by creating your
!     own rules that rewrite the <command>INSERT</command>,
!     <command>UPDATE</command> and <command>UPDATE</command> actions on the view
!     into appropriate actions on other tables. You can also replace
!     automatically generated rules with your own rules. For more information,
!     refer to <xref linkend="sql-createrule" endterm="sql-createrule-title">.
     </para>
  
     <para>
*************** CREATE VIEW comedies AS
*** 171,225 ****
    <title>Compatibility</title>
  
    <para>
-    The SQL standard specifies some additional capabilities for the
-    <command>CREATE VIEW</command> statement:
- <synopsis>
- CREATE VIEW <replaceable class="parameter">name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ]
-     AS <replaceable class="PARAMETER">query</replaceable>
-     [ WITH [ CASCADED | LOCAL ] CHECK OPTION ]
- </synopsis>
-   </para>
- 
-   <para>
-    The optional clauses for the full SQL command are:
- 
-    <variablelist>
-      <varlistentry>
-       <term><literal>CHECK OPTION</literal></term>
-       <listitem>
-        <para>
-         This option has to do with updatable views.  All
-         <command>INSERT</> and <command>UPDATE</> commands on the view
-         will be checked to ensure data satisfy the view-defining
-         condition (that is, the new data would be visible through the
-         view). If they do not, the update will be rejected.
-        </para>
-       </listitem>
-      </varlistentry>
- 
-      <varlistentry>
-       <term><literal>LOCAL</literal></term>
-       <listitem>
-        <para>
-         Check for integrity on this view.
-        </para>
-       </listitem>
-      </varlistentry>
- 
-      <varlistentry>
-       <term><literal>CASCADED</literal></term>
-       <listitem>
-        <para>
-         Check for integrity on this view and on any dependent
-         view. <literal>CASCADED</> is assumed if neither
-         <literal>CASCADED</> nor <literal>LOCAL</> is specified.
-        </para>
-       </listitem>
-      </varlistentry>
-    </variablelist>
-   </para>
- 
-   <para>
     <command>CREATE OR REPLACE VIEW</command> is a
     <productname>PostgreSQL</productname> language extension.
     So is the concept of a temporary view.
--- 246,251 ----
Index: src/backend/catalog/information_schema.sql
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/catalog/information_schema.sql,v
retrieving revision 1.33
diff -c -p -r1.33 information_schema.sql
*** src/backend/catalog/information_schema.sql	2 Apr 2006 17:38:13 -0000	1.33
--- src/backend/catalog/information_schema.sql	17 Aug 2006 03:34:45 -0000
*************** CREATE VIEW views AS
*** 2126,2132 ****
                    ELSE null END
               AS character_data) AS view_definition,
  
!            CAST('NONE' AS character_data) AS check_option,
  
             CAST(
               CASE WHEN EXISTS (SELECT 1 FROM pg_rewrite WHERE ev_class = c.oid AND ev_type = 2 AND is_instead)
--- 2126,2143 ----
                    ELSE null END
               AS character_data) AS view_definition,
  
!            CAST(
!              CASE WHEN EXISTS (SELECT 1 FROM pg_rewrite WHERE ev_class = c.oid AND ev_type = 2 AND is_instead AND ev_kind = 'l')
!                     OR EXISTS (SELECT 1 FROM pg_rewrite WHERE ev_class = c.oid AND ev_type = 3 AND is_instead AND ev_kind = 'l')
!                     OR EXISTS (SELECT 1 FROM pg_rewrite WHERE ev_class = c.oid AND ev_type = 4 AND is_instead AND ev_kind = 'l')
!                   THEN 'LOCAL'
!                   WHEN EXISTS (SELECT 1 FROM pg_rewrite WHERE ev_class = c.oid AND ev_type = 2 AND is_instead AND ev_kind = 'c')
!                     OR EXISTS (SELECT 1 FROM pg_rewrite WHERE ev_class = c.oid AND ev_type = 3 AND is_instead AND ev_kind = 'c')
!                     OR EXISTS (SELECT 1 FROM pg_rewrite WHERE ev_class = c.oid AND ev_type = 4 AND is_instead AND ev_kind = 'c')
!                   THEN 'CASCADED'
!                   ELSE 'NONE' 
!              END
!            AS character_data) AS check_option,
  
             CAST(
               CASE WHEN EXISTS (SELECT 1 FROM pg_rewrite WHERE ev_class = c.oid AND ev_type = 2 AND is_instead)
Index: src/backend/commands/view.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/commands/view.c,v
retrieving revision 1.96
diff -c -p -r1.96 view.c
*** src/backend/commands/view.c	13 Jul 2006 16:49:14 -0000	1.96
--- src/backend/commands/view.c	20 Aug 2006 19:55:36 -0000
***************
*** 29,41 ****
--- 29,44 ----
  #include "rewrite/rewriteDefine.h"
  #include "rewrite/rewriteManip.h"
  #include "rewrite/rewriteSupport.h"
+ #include "rewrite/viewUpdate.h"
  #include "utils/acl.h"
  #include "utils/lsyscache.h"
+ #include "utils/memutils.h"
  
  
  static void checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc);
  static bool isViewOnTempTable_walker(Node *node, void *context);
  
+ 
  /*---------------------------------------------------------------------
   * isViewOnTempTable
   *
*************** FormViewRetrieveRule(const RangeVar *vie
*** 281,330 ****
  }
  
  static void
! DefineViewRules(const RangeVar *view, Query *viewParse, bool replace)
  {
  	RuleStmt   *retrieve_rule;
  
- #ifdef NOTYET
- 	RuleStmt   *replace_rule;
- 	RuleStmt   *append_rule;
- 	RuleStmt   *delete_rule;
- #endif
- 
  	retrieve_rule = FormViewRetrieveRule(view, viewParse, replace);
  
! #ifdef NOTYET
! 	replace_rule = FormViewReplaceRule(view, viewParse);
! 	append_rule = FormViewAppendRule(view, viewParse);
! 	delete_rule = FormViewDeleteRule(view, viewParse);
! #endif
! 
! 	DefineQueryRewrite(retrieve_rule);
! 
! #ifdef NOTYET
! 	DefineQueryRewrite(replace_rule);
! 	DefineQueryRewrite(append_rule);
! 	DefineQueryRewrite(delete_rule);
! #endif
  
  }
  
! /*---------------------------------------------------------------
   * UpdateRangeTableOfViewParse
   *
!  * Update the range table of the given parsetree.
!  * This update consists of adding two new entries IN THE BEGINNING
!  * of the range table (otherwise the rule system will die a slow,
!  * horrible and painful death, and we do not want that now, do we?)
!  * one for the OLD relation and one for the NEW one (both of
   * them refer in fact to the "view" relation).
   *
!  * Of course we must also increase the 'varnos' of all the Var nodes
!  * by 2...
   *
!  * These extra RT entries are not actually used in the query,
!  * except for run-time permission checking.
!  *---------------------------------------------------------------
   */
  static Query *
  UpdateRangeTableOfViewParse(Oid viewOid, Query *viewParse)
--- 284,315 ----
  }
  
  static void
! DefineViewRules(const RangeVar *view, Query *viewParse, bool replace,
! 				bool checkOption, bool cascade)
  {
  	RuleStmt   *retrieve_rule;
  
  	retrieve_rule = FormViewRetrieveRule(view, viewParse, replace);
  
! 	DefineQueryRewrite(retrieve_rule, 
! 					   makeViewCheckOption(checkOption, cascade));
  
+ 	CreateViewUpdateRules(viewParse, view, checkOption, cascade);
  }
  
! /*
   * UpdateRangeTableOfViewParse
   *
!  * Update the range table of the given parsetree.  This update consists of
!  * adding two new entries IN THE BEGINNING of the range table (otherwise the
!  * rule system will die a slow, horrible and painful death, and we do not want
!  * that now, do we?) one for the OLD relation and one for the NEW one (both of
   * them refer in fact to the "view" relation).
   *
!  * Of course we must also increase the 'varnos' of all the Var nodes by 2...
   *
!  * These extra RT entries are not actually used in the query, except for
!  * run-time permission checking.
   */
  static Query *
  UpdateRangeTableOfViewParse(Oid viewOid, Query *viewParse)
*************** UpdateRangeTableOfViewParse(Oid viewOid,
*** 388,394 ****
   *-------------------------------------------------------------------
   */
  void
! DefineView(RangeVar *view, Query *viewParse, bool replace)
  {
  	Oid			viewOid;
  
--- 373,380 ----
   *-------------------------------------------------------------------
   */
  void
! DefineView(RangeVar *view, Query *viewParse, bool replace, bool checkOption,
! 		   bool cascade)
  {
  	Oid			viewOid;
  
*************** DefineView(RangeVar *view, Query *viewPa
*** 429,435 ****
  	/*
  	 * Now create the rules associated with the view.
  	 */
! 	DefineViewRules(view, viewParse, replace);
  }
  
  /*
--- 415,421 ----
  	/*
  	 * Now create the rules associated with the view.
  	 */
! 	DefineViewRules(view, viewParse, replace, checkOption, cascade);
  }
  
  /*
Index: src/backend/parser/gram.y
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/parser/gram.y,v
retrieving revision 2.556
diff -c -p -r2.556 gram.y
*** src/backend/parser/gram.y	12 Aug 2006 18:58:54 -0000	2.556
--- src/backend/parser/gram.y	19 Aug 2006 19:48:37 -0000
*************** AlterOwnerStmt: ALTER AGGREGATE func_nam
*** 4473,4479 ****
   *****************************************************************************/
  
  RuleStmt:	CREATE opt_or_replace RULE name AS
! 			{ QueryIsRule=TRUE; }
  			ON event TO qualified_name where_clause
  			DO opt_instead RuleActionList
  				{
--- 4473,4479 ----
   *****************************************************************************/
  
  RuleStmt:	CREATE opt_or_replace RULE name AS
! 			{ QueryIsRule = true; }
  			ON event TO qualified_name where_clause
  			DO opt_instead RuleActionList
  				{
*************** RuleStmt:	CREATE opt_or_replace RULE nam
*** 4486,4492 ****
  					n->instead = $13;
  					n->actions = $14;
  					$$ = (Node *)n;
! 					QueryIsRule=FALSE;
  				}
  		;
  
--- 4486,4492 ----
  					n->instead = $13;
  					n->actions = $14;
  					$$ = (Node *)n;
! 					QueryIsRule = false;
  				}
  		;
  
*************** RuleActionList:
*** 4499,4511 ****
  /* the thrashing around here is to discard "empty" statements... */
  RuleActionMulti:
  			RuleActionMulti ';' RuleActionStmtOrEmpty
! 				{ if ($3 != NULL)
  					$$ = lappend($1, $3);
  				  else
  					$$ = $1;
  				}
  			| RuleActionStmtOrEmpty
! 				{ if ($1 != NULL)
  					$$ = list_make1($1);
  				  else
  					$$ = NIL;
--- 4499,4513 ----
  /* the thrashing around here is to discard "empty" statements... */
  RuleActionMulti:
  			RuleActionMulti ';' RuleActionStmtOrEmpty
! 				{
! 				  if ($3 != NULL)
  					$$ = lappend($1, $3);
  				  else
  					$$ = $1;
  				}
  			| RuleActionStmtOrEmpty
! 				{
! 				  if ($1 != NULL)
  					$$ = list_make1($1);
  				  else
  					$$ = NIL;
*************** ViewStmt: CREATE OptTemp VIEW qualified_
*** 4770,4775 ****
--- 4772,4778 ----
  					n->view->istemp = $2;
  					n->aliases = $5;
  					n->query = (Query *) $7;
+ 					n->options = $8;
  					$$ = (Node *) n;
  				}
  		| CREATE OR REPLACE OptTemp VIEW qualified_name opt_column_list
*************** ViewStmt: CREATE OptTemp VIEW qualified_
*** 4781,4786 ****
--- 4784,4790 ----
  					n->view->istemp = $4;
  					n->aliases = $7;
  					n->query = (Query *) $9;
+ 					n->options = $10;
  					$$ = (Node *) n;
  				}
  		;
*************** ViewStmt: CREATE OptTemp VIEW qualified_
*** 4792,4812 ****
  opt_check_option:
  		WITH_CHECK OPTION
  				{
! 					ereport(ERROR,
! 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 							 errmsg("WITH CHECK OPTION is not implemented")));
  				}
  		| WITH_CASCADED CHECK OPTION
  				{
! 					ereport(ERROR,
! 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 							 errmsg("WITH CHECK OPTION is not implemented")));
  				}
  		| WITH_LOCAL CHECK OPTION
  				{
! 					ereport(ERROR,
! 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 							 errmsg("WITH CHECK OPTION is not implemented")));
  				}
  		| /* EMPTY */							{ $$ = NIL; }
  		;
--- 4796,4810 ----
  opt_check_option:
  		WITH_CHECK OPTION
  				{
! 					$$ = list_make1((Node *)makeString("cascaded"));
  				}
  		| WITH_CASCADED CHECK OPTION
  				{
! 					$$ = list_make1((Node *)makeString("cascaded"));
  				}
  		| WITH_LOCAL CHECK OPTION
  				{
! 					$$ = list_make1((Node *)makeString("local"));
  				}
  		| /* EMPTY */							{ $$ = NIL; }
  		;
Index: src/backend/parser/keywords.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/parser/keywords.c,v
retrieving revision 1.175
diff -c -p -r1.175 keywords.c
*** src/backend/parser/keywords.c	12 Aug 2006 02:52:05 -0000	1.175
--- src/backend/parser/keywords.c	17 Aug 2006 03:34:45 -0000
*************** static const ScanKeyword ScanKeywords[] 
*** 67,72 ****
--- 67,73 ----
  	{"cache", CACHE},
  	{"called", CALLED},
  	{"cascade", CASCADE},
+ 	{"cascaded", CASCADED},
  	{"case", CASE},
  	{"cast", CAST},
  	{"chain", CHAIN},
Index: src/backend/rewrite/Makefile
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/rewrite/Makefile,v
retrieving revision 1.15
diff -c -p -r1.15 Makefile
*** src/backend/rewrite/Makefile	29 Nov 2003 19:51:55 -0000	1.15
--- src/backend/rewrite/Makefile	20 Aug 2006 19:47:51 -0000
*************** top_builddir = ../../..
*** 13,19 ****
  include $(top_builddir)/src/Makefile.global
  
  OBJS = rewriteRemove.o rewriteDefine.o \
!        rewriteHandler.o rewriteManip.o rewriteSupport.o
  
  all: SUBSYS.o
  
--- 13,20 ----
  include $(top_builddir)/src/Makefile.global
  
  OBJS = rewriteRemove.o rewriteDefine.o \
!        rewriteHandler.o rewriteManip.o rewriteSupport.o \
!        viewUpdate.o
  
  all: SUBSYS.o
  
Index: src/backend/rewrite/rewriteDefine.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/rewrite/rewriteDefine.c,v
retrieving revision 1.112
diff -c -p -r1.112 rewriteDefine.c
*** src/backend/rewrite/rewriteDefine.c	12 Aug 2006 20:05:55 -0000	1.112
--- src/backend/rewrite/rewriteDefine.c	17 Aug 2006 03:34:45 -0000
***************
*** 23,28 ****
--- 23,29 ----
  #include "parser/parse_expr.h"
  #include "rewrite/rewriteDefine.h"
  #include "rewrite/rewriteManip.h"
+ #include "rewrite/rewriteRemove.h"
  #include "rewrite/rewriteSupport.h"
  #include "storage/smgr.h"
  #include "utils/acl.h"
*************** InsertRule(char *rulname,
*** 47,52 ****
--- 48,54 ----
  		   Oid eventrel_oid,
  		   AttrNumber evslot_index,
  		   bool evinstead,
+ 		   char evkind,
  		   Node *event_qual,
  		   List *action,
  		   bool replace)
*************** InsertRule(char *rulname,
*** 78,83 ****
--- 80,86 ----
  	values[i++] = Int16GetDatum(evslot_index);	/* ev_attr */
  	values[i++] = CharGetDatum(evtype + '0');	/* ev_type */
  	values[i++] = BoolGetDatum(evinstead);		/* is_instead */
+ 	values[i++] = CharGetDatum(evkind);         /* ev_kind */
  	values[i++] = DirectFunctionCall1(textin, CStringGetDatum(evqual)); /* ev_qual */
  	values[i++] = DirectFunctionCall1(textin, CStringGetDatum(actiontree));		/* ev_action */
  
*************** InsertRule(char *rulname,
*** 97,107 ****
  	if (HeapTupleIsValid(oldtup))
  	{
  		if (!replace)
! 			ereport(ERROR,
! 					(errcode(ERRCODE_DUPLICATE_OBJECT),
! 					 errmsg("rule \"%s\" for relation \"%s\" already exists",
! 							rulname, get_rel_name(eventrel_oid))));
  
  		/*
  		 * When replacing, we don't need to replace every attribute
  		 */
--- 100,131 ----
  	if (HeapTupleIsValid(oldtup))
  	{
  		if (!replace)
! 		{
! 			/*
! 			 * If REPLACE was not used we still have to check if the
! 			 * rule is implicit: then we have to replace it anyways.
! 			 */
! 			char	old_tup_is_implicit;
! 			bool	isnull;
! 			Datum	dat;
! 
! 			dat = heap_getattr(oldtup,
! 									 Anum_pg_rewrite_ev_kind,
! 									 RelationGetDescr(pg_rewrite_desc),
! 									 &isnull);
! 			/* should not happen */
! 			if (!isnull) 
! 				elog(ERROR, "got null field in ev_kind where not null expected");
! 			
! 			old_tup_is_implicit = DatumGetChar(dat);
  
+ 			if (old_tup_is_implicit == NO_OPTION_EXPLICIT)
+ 				ereport(ERROR,
+ 						(errcode(ERRCODE_DUPLICATE_OBJECT),
+ 						 errmsg("rule \"%s\" for relation \"%s\" already exists",
+ 								rulname, get_rel_name(eventrel_oid))));
+ 		}
+ 		
  		/*
  		 * When replacing, we don't need to replace every attribute
  		 */
*************** InsertRule(char *rulname,
*** 109,114 ****
--- 133,139 ----
  		replaces[Anum_pg_rewrite_ev_attr - 1] = 'r';
  		replaces[Anum_pg_rewrite_ev_type - 1] = 'r';
  		replaces[Anum_pg_rewrite_is_instead - 1] = 'r';
+ 		replaces[Anum_pg_rewrite_ev_kind - 1] = 'r';
  		replaces[Anum_pg_rewrite_ev_qual - 1] = 'r';
  		replaces[Anum_pg_rewrite_ev_action - 1] = 'r';
  
*************** InsertRule(char *rulname,
*** 177,183 ****
  }
  
  void
! DefineQueryRewrite(RuleStmt *stmt)
  {
  	RangeVar   *event_obj = stmt->relation;
  	Node	   *event_qual = stmt->whereClause;
--- 202,208 ----
  }
  
  void
! DefineQueryRewrite(RuleStmt *stmt, char ev_kind)
  {
  	RangeVar   *event_obj = stmt->relation;
  	Node	   *event_qual = stmt->whereClause;
*************** DefineQueryRewrite(RuleStmt *stmt)
*** 444,449 ****
--- 469,484 ----
  	}
  	setRuleCheckAsUser_Expr(event_qual, GetUserId());
  
+ 	/*
+ 	 * Implicit rules should be dropped automatically when someone
+ 	 * wants to have its *own* rules on the view. is_implicit is set
+ 	 * to NO_OPTION_EXPLICIT in this case so we drop all implicit 
+ 	 * rules on the specified event type immediately.
+ 	 */
+ 
+ 	if (ev_kind == NO_OPTION_EXPLICIT)
+ 		 deleteImplicitRulesOnEvent(event_relation, event_type);
+ 
  	/* discard rule if it's null action and not INSTEAD; it's a no-op */
  	if (action != NIL || is_instead)
  	{
*************** DefineQueryRewrite(RuleStmt *stmt)
*** 452,457 ****
--- 487,493 ----
  							ev_relid,
  							event_attno,
  							is_instead,
+ 							ev_kind,
  							event_qual,
  							action,
  							replace);
*************** DefineQueryRewrite(RuleStmt *stmt)
*** 469,483 ****
  	}
  
  	/*
! 	 * IF the relation is becoming a view, delete the storage files associated
  	 * with it.  NB: we had better have AccessExclusiveLock to do this ...
  	 *
  	 * XXX what about getting rid of its TOAST table?  For now, we don't.
  	 */
  	if (RelisBecomingView)
  	{
! 		RelationOpenSmgr(event_relation);
  		smgrscheduleunlink(event_relation->rd_smgr, event_relation->rd_istemp);
  	}
  
  	/* Close rel, but keep lock till commit... */
--- 505,526 ----
  	}
  
  	/*
! 	 * If the relation is becoming a view, delete the storage files associated
  	 * with it.  NB: we had better have AccessExclusiveLock to do this ...
  	 *
  	 * XXX what about getting rid of its TOAST table?  For now, we don't.
  	 */
  	if (RelisBecomingView)
  	{
! 		/*
! 		 * XXX -- Why this change?
! 		 *
! 		 RelationOpenSmgr(event_relation);
! 		 */
! 		if (event_relation->rd_smgr == NULL)
! 			event_relation->rd_smgr = smgropen(event_relation->rd_node);
  		smgrscheduleunlink(event_relation->rd_smgr, event_relation->rd_istemp);
+ 		event_relation->rd_smgr = NULL;
  	}
  
  	/* Close rel, but keep lock till commit... */
Index: src/backend/rewrite/rewriteHandler.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/rewrite/rewriteHandler.c,v
retrieving revision 1.165
diff -c -p -r1.165 rewriteHandler.c
*** src/backend/rewrite/rewriteHandler.c	2 Aug 2006 01:59:47 -0000	1.165
--- src/backend/rewrite/rewriteHandler.c	20 Aug 2006 01:20:23 -0000
***************
*** 15,20 ****
--- 15,21 ----
  
  #include "access/heapam.h"
  #include "catalog/pg_type.h"
+ #include "catalog/pg_rewrite.h"
  #include "nodes/makefuncs.h"
  #include "optimizer/clauses.h"
  #include "parser/analyze.h"
*************** typedef struct rewrite_event
*** 34,45 ****
  	CmdType		event;			/* type of rule being fired */
  } rewrite_event;
  
  static bool acquireLocksOnSubLinks(Node *node, void *context);
  static Query *rewriteRuleAction(Query *parsetree,
  				  Query *rule_action,
  				  Node *rule_qual,
  				  int rt_index,
! 				  CmdType event);
  static List *adjustJoinTreeList(Query *parsetree, bool removert, int rt_index);
  static void rewriteTargetList(Query *parsetree, Relation target_relation,
  							  List **attrno_list);
--- 35,55 ----
  	CmdType		event;			/* type of rule being fired */
  } rewrite_event;
  
+ /* Rule rewrite status for updatable views */
+ typedef struct view_update_event
+ {
+ 	bool viewUpdate;
+ 	bool needCheckOption;
+ 	char checkMode;	
+ } view_update_event;
+ 
  static bool acquireLocksOnSubLinks(Node *node, void *context);
  static Query *rewriteRuleAction(Query *parsetree,
  				  Query *rule_action,
  				  Node *rule_qual,
  				  int rt_index,
!                   CmdType event,
! 				  view_update_event *view_event);
  static List *adjustJoinTreeList(Query *parsetree, bool removert, int rt_index);
  static void rewriteTargetList(Query *parsetree, Relation target_relation,
  							  List **attrno_list);
*************** rewriteRuleAction(Query *parsetree,
*** 257,263 ****
  				  Query *rule_action,
  				  Node *rule_qual,
  				  int rt_index,
! 				  CmdType event)
  {
  	int			current_varno,
  				new_varno;
--- 267,274 ----
  				  Query *rule_action,
  				  Node *rule_qual,
  				  int rt_index,
! 				  CmdType event,
! 				  view_update_event *view_event)
  {
  	int			current_varno,
  				new_varno;
*************** rewriteRuleAction(Query *parsetree,
*** 387,393 ****
  	 * queries one w/rule_qual, one w/NOT rule_qual. Also add user query qual
  	 * onto rule action
  	 */
! 	AddQual(sub_action, rule_qual);
  
  	AddQual(sub_action, parsetree->jointree->quals);
  
--- 398,405 ----
  	 * queries one w/rule_qual, one w/NOT rule_qual. Also add user query qual
  	 * onto rule action
  	 */
! 	if (view_event->needCheckOption)
! 		AddQual(sub_action, rule_qual);
  
  	AddQual(sub_action, parsetree->jointree->quals);
  
*************** build_column_default(Relation rel, int a
*** 820,825 ****
--- 832,850 ----
  		}
  	}
  
+ #if 0
+ 	/*
+ 	 * I will do this only in case of relkind == RELKIND_VIEW.
+ 	 * This is the last attempt to get a value for expr before we
+ 	 * consider that expr must be NULL.
+ 	 */
+ 	if (expr == NULL && rel->rd_rel->relkind == RELKIND_VIEW)
+ 	{
+ 		expr = (Node *)makeNode(SetToDefault);
+ 		return expr;
+ 	}
+ #endif
+ 		
  	if (expr == NULL)
  	{
  		/*
*************** CopyAndAddInvertedQual(Query *parsetree,
*** 1362,1367 ****
--- 1387,1394 ----
   * Return value:
   *	list of rule actions adjusted for use with this query
   *
+  *	view_event -- FIXME -- what is this for?
+  *
   * Qualified INSTEAD rules generate their action with the qualification
   * condition added.  They also generate a modified version of the original
   * query with the negated qualification added, so that it will run only for
*************** fireRules(Query *parsetree,
*** 1376,1381 ****
--- 1403,1409 ----
  		  int rt_index,
  		  CmdType event,
  		  List *locks,
+ 		  view_update_event *view_event,
  		  bool *instead_flag,
  		  Query **qual_product)
  {
*************** fireRules(Query *parsetree,
*** 1390,1400 ****
  		QuerySource qsrc;
  		ListCell   *r;
  
  		/* Determine correct QuerySource value for actions */
  		if (rule_lock->isInstead)
  		{
! 			if (event_qual != NULL)
! 				qsrc = QSRC_QUAL_INSTEAD_RULE;
  			else
  			{
  				qsrc = QSRC_INSTEAD_RULE;
--- 1418,1507 ----
  		QuerySource qsrc;
  		ListCell   *r;
  
+ 		/*
+ 		 * First check: look if this rule is an implicit view update rule. If
+ 		 * false, always apply any rule qualification.
+ 		 */
+ 		if (rule_lock->ev_kind == NO_OPTION_EXPLICIT)
+ 		{
+ 			view_event->viewUpdate = false;
+ 			view_event->needCheckOption = true;
+ 			view_event->checkMode = rule_lock->ev_kind;
+ 		}
+ 		
+ 		/*
+ 		 * Second check: Look if we are already in a view update or not.  If
+ 		 * true and the event check mode is set to LOCAL_OPTION_IMPLICIT in
+ 		 * conjunction with viewUpdate = true, we don't need to apply the check
+ 		 * option otherwise.
+ 		 */
+ 		else if (!view_event->viewUpdate &&
+ 				 ((rule_lock->ev_kind == LOCAL_OPTION_IMPLICIT) ||
+ 				  (rule_lock->ev_kind == CASCADED_OPTION_IMPLICIT)))
+ 				 
+ 		{
+ 			/* apply check option, but only once */
+ 			view_event->viewUpdate = true;
+ 			view_event->needCheckOption = true;
+ 			view_event->checkMode = rule_lock->ev_kind;
+ 		}
+ 
+ 		/*
+ 		 * Third check: Look if we are in a recursive view update in
+ 		 * conjunction with a CASCADED CHECK OPTION. If true, we always need to
+ 		 * apply any check options found.
+ 		 */
+ 		else if (view_event->viewUpdate &&
+ 				 (view_event->checkMode == CASCADED_OPTION_IMPLICIT) &&
+ 				 (rule_lock->ev_kind != NO_OPTION_EXPLICIT))
+ 		{
+ 			view_event->needCheckOption = true;
+ 		}
+ 
+ 		/*
+ 		 * 4th check: make sure we reset the view update event if we are in a
+ 		 * recursive view update and passed the first view already. If we had a
+ 		 * LOCAL CHECK OPTION, we have to make sure we don't fire any check
+ 		 * options anymore.
+ 		 */
+ 		else if (view_event->viewUpdate &&
+ 				 (view_event->checkMode == LOCAL_OPTION_IMPLICIT) &&
+ 				 (rule_lock->ev_kind != NO_OPTION_EXPLICIT))
+ 		{
+ 			view_event->needCheckOption = false;
+ 			/* force LOCAL check mode */
+ 			view_event->checkMode = LOCAL_OPTION_IMPLICIT;
+ 		}
+ 
  		/* Determine correct QuerySource value for actions */
  		if (rule_lock->isInstead)
  		{
! 			if (event_qual != NULL) 
! 			{				
! 				/*
! 				 * Check out wether we need to apply any
! 				 * rule qualification to a view update rule.
! 				 * We only bother on implicit rules that aren't
! 				 * marked with NO_OPTION_EXPLICIT
! 				 */
! 				if ((rule_lock->ev_kind != NO_OPTION_EXPLICIT)
! 					&& (view_event->viewUpdate))
! 				{
! 					if (view_event->needCheckOption)
! 					{
! 						qsrc = QSRC_QUAL_INSTEAD_RULE;
! 					}
! 					else
! 					{
! 						qsrc = QSRC_INSTEAD_RULE;
! 						*instead_flag = true;
! 					}
! 				} 
! 				else 
! 				{
! 					qsrc = QSRC_QUAL_INSTEAD_RULE;
! 				}
! 			}
  			else
  			{
  				qsrc = QSRC_INSTEAD_RULE;
*************** fireRules(Query *parsetree,
*** 1438,1444 ****
  				continue;
  
  			rule_action = rewriteRuleAction(parsetree, rule_action,
! 											event_qual, rt_index, event);
  
  			rule_action->querySource = qsrc;
  			rule_action->canSetTag = false;		/* might change later */
--- 1545,1552 ----
  				continue;
  
  			rule_action = rewriteRuleAction(parsetree, rule_action,
! 											event_qual, rt_index, event,
! 											view_event);
  
  			rule_action->querySource = qsrc;
  			rule_action->canSetTag = false;		/* might change later */
*************** fireRules(Query *parsetree,
*** 1459,1465 ****
   * infinite recursion.
   */
  static List *
! RewriteQuery(Query *parsetree, List *rewrite_events)
  {
  	CmdType		event = parsetree->commandType;
  	bool		instead = false;
--- 1567,1574 ----
   * infinite recursion.
   */
  static List *
! RewriteQuery(Query *parsetree, List *rewrite_events, 
! 			 view_update_event *view_event)
  {
  	CmdType		event = parsetree->commandType;
  	bool		instead = false;
*************** RewriteQuery(Query *parsetree, List *rew
*** 1550,1555 ****
--- 1659,1665 ----
  										result_relation,
  										event,
  										locks,
+ 										view_event,
  										&instead,
  										&qual_product);
  
*************** RewriteQuery(Query *parsetree, List *rew
*** 1583,1589 ****
  					Query	   *pt = (Query *) lfirst(n);
  					List	   *newstuff;
  
! 					newstuff = RewriteQuery(pt, rewrite_events);
  					rewritten = list_concat(rewritten, newstuff);
  				}
  
--- 1693,1699 ----
  					Query	   *pt = (Query *) lfirst(n);
  					List	   *newstuff;
  
! 					newstuff = RewriteQuery(pt, rewrite_events, view_event);
  					rewritten = list_concat(rewritten, newstuff);
  				}
  
*************** QueryRewrite(Query *parsetree)
*** 1646,1658 ****
  	CmdType		origCmdType;
  	bool		foundOriginalQuery;
  	Query	   *lastInstead;
  
  	/*
  	 * Step 1
  	 *
  	 * Apply all non-SELECT rules possibly getting 0 or many queries
  	 */
! 	querylist = RewriteQuery(parsetree, NIL);
  
  	/*
  	 * Step 2
--- 1756,1772 ----
  	CmdType		origCmdType;
  	bool		foundOriginalQuery;
  	Query	   *lastInstead;
+ 	view_update_event v_event;
  
  	/*
  	 * Step 1
  	 *
  	 * Apply all non-SELECT rules possibly getting 0 or many queries
  	 */
! 	v_event.viewUpdate      = false;
! 	v_event.needCheckOption = false;
! 	v_event.checkMode       = NO_OPTION_EXPLICIT;
! 	querylist = RewriteQuery(parsetree, NIL, &v_event);
  
  	/*
  	 * Step 2
Index: src/backend/rewrite/rewriteRemove.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/rewrite/rewriteRemove.c,v
retrieving revision 1.65
diff -c -p -r1.65 rewriteRemove.c
*** src/backend/rewrite/rewriteRemove.c	16 Jun 2006 20:23:44 -0000	1.65
--- src/backend/rewrite/rewriteRemove.c	17 Aug 2006 03:34:45 -0000
*************** RemoveRewriteRule(Oid owningRel, const c
*** 89,94 ****
--- 89,138 ----
  	performDeletion(&object, behavior);
  }
  
+ /*
+  * deleteImplicitRulesOnEvent
+  * 
+  * This will delete implicit rules, if any exists, on the event in the relation
+  */
+ void 
+ deleteImplicitRulesOnEvent(Relation rel, CmdType event_type)
+ {
+ 	RuleLock *rulelocks = rel->rd_rules;
+ 	int nlocks;
+ 	int i;
+ 
+ 	/*
+ 	 * Select rules are implicit (are they marked as implicit??) 
+ 	 * but we don't want to delete them
+ 	 */
+ 	if (event_type == CMD_SELECT)
+ 		return;
+ 
+ 	/*
+ 	 * If there are no rules on the relation we waste no more time
+ 	 */
+ 	if (rulelocks == NULL)
+ 		return;
+ 
+ 	nlocks = rulelocks->numLocks;
+ 
+ 	/*
+ 	 * Look at all rules looking for the ones that are on the event and are
+ 	 * implicit
+ 	 */
+ 	for (i = 0; i < nlocks; i++)
+ 	{
+ 		RewriteRule *oneLock = rulelocks->rules[i];		
+ 
+ 		if (oneLock->event == event_type 
+ 			&& (oneLock->ev_kind != NO_OPTION_EXPLICIT)) 
+ 		{
+ 			RemoveRewriteRuleById(oneLock->ruleId);
+ 			deleteDependencyRecordsFor(RewriteRelationId, oneLock->ruleId);
+ 		}
+ 	}
+ }
+ 
  
  /*
   * Guts of rule deletion.
Index: src/backend/tcop/utility.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/tcop/utility.c,v
retrieving revision 1.265
diff -c -p -r1.265 utility.c
*** src/backend/tcop/utility.c	12 Aug 2006 20:05:56 -0000	1.265
--- src/backend/tcop/utility.c	17 Aug 2006 03:34:45 -0000
***************
*** 20,25 ****
--- 20,26 ----
  #include "access/xact.h"
  #include "catalog/catalog.h"
  #include "catalog/namespace.h"
+ #include "catalog/pg_rewrite.h"
  #include "catalog/toasting.h"
  #include "commands/alter.h"
  #include "commands/async.h"
*************** ProcessUtility(Node *parsetree,
*** 764,771 ****
  		case T_ViewStmt:		/* CREATE VIEW */
  			{
  				ViewStmt   *stmt = (ViewStmt *) parsetree;
  
! 				DefineView(stmt->view, stmt->query, stmt->replace);
  			}
  			break;
  
--- 765,787 ----
  		case T_ViewStmt:		/* CREATE VIEW */
  			{
  				ViewStmt   *stmt = (ViewStmt *) parsetree;
+ 				bool		checkOption = false;
+ 				bool		cascade = false;
+ 				ListCell   *cell;
  
! 				if (list_length(stmt->options) > 0)
! 					checkOption = true;
! 
! 				foreach(cell, stmt->options)
! 				{
! 					Value *val = (Value *) lfirst(cell);
! 
! 					if (strncmp(strVal(val), "cascade", strlen("cascade")) == 0)
! 						cascade = true;
! 				}
! 
! 				DefineView(stmt->view, stmt->query, stmt->replace, 
! 						   checkOption, cascade);
  			}
  			break;
  
*************** ProcessUtility(Node *parsetree,
*** 803,809 ****
  			break;
  
  		case T_RuleStmt:		/* CREATE RULE */
! 			DefineQueryRewrite((RuleStmt *) parsetree);
  			break;
  
  		case T_CreateSeqStmt:
--- 819,825 ----
  			break;
  
  		case T_RuleStmt:		/* CREATE RULE */
! 			DefineQueryRewrite((RuleStmt *) parsetree, NO_OPTION_EXPLICIT);
  			break;
  
  		case T_CreateSeqStmt:
Index: src/backend/utils/adt/ruleutils.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/utils/adt/ruleutils.c,v
retrieving revision 1.231
diff -c -p -r1.231 ruleutils.c
*** src/backend/utils/adt/ruleutils.c	12 Aug 2006 02:52:05 -0000	1.231
--- src/backend/utils/adt/ruleutils.c	18 Aug 2006 03:02:15 -0000
***************
*** 18,23 ****
--- 18,24 ----
  #include "catalog/pg_depend.h"
  #include "catalog/pg_opclass.h"
  #include "catalog/pg_operator.h"
+ #include "catalog/pg_rewrite.h"
  #include "catalog/pg_trigger.h"
  #include "executor/spi.h"
  #include "funcapi.h"
*************** make_viewdef(StringInfo buf, HeapTuple r
*** 1683,1688 ****
--- 1684,1690 ----
  	Oid			ev_class;
  	int2		ev_attr;
  	bool		is_instead;
+ 	char        ev_kind;
  	char	   *ev_qual;
  	char	   *ev_action;
  	List	   *actions = NIL;
*************** make_viewdef(StringInfo buf, HeapTuple r
*** 1705,1710 ****
--- 1707,1715 ----
  	fno = SPI_fnumber(rulettc, "is_instead");
  	is_instead = (bool) SPI_getbinval(ruletup, rulettc, fno, &isnull);
  
+ 	fno = SPI_fnumber(rulettc, "ev_kind");
+ 	ev_kind = (char) SPI_getbinval(ruletup, rulettc, fno, &isnull);
+ 
  	fno = SPI_fnumber(rulettc, "ev_qual");
  	ev_qual = SPI_getvalue(ruletup, rulettc, fno);
  
*************** make_viewdef(StringInfo buf, HeapTuple r
*** 1732,1737 ****
--- 1737,1761 ----
  
  	get_query_def(query, buf, NIL, RelationGetDescr(ev_relation),
  				  prettyFlags, 0);
+ 
+ 	/*
+ 	 * Support for updatable views: append the check option if
+ 	 * required. If any pretty print flags is enabled, we need
+ 	 * to do a linebreak before.
+ 	 */
+ 	if (ev_kind != NO_OPTION_EXPLICIT || ev_kind != NO_OPTION_IMPLICIT)
+ 	{
+ 		if (prettyFlags & PRETTYFLAG_INDENT)
+ 			appendStringInfo(buf, "\n");
+ 
+ 		appendStringInfoSpaces(buf, 1);
+ 
+ 		if (ev_kind == LOCAL_OPTION_IMPLICIT)
+ 			appendStringInfo(buf, " WITH LOCAL CHECK OPTION");
+ 		else if (ev_kind == CASCADED_OPTION_IMPLICIT)
+ 			appendStringInfo(buf, " WITH CASCADED CHECK OPTION");
+ 	}
+ 
  	appendStringInfo(buf, ";");
  
  	heap_close(ev_relation, AccessShareLock);
Index: src/backend/utils/cache/relcache.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/utils/cache/relcache.c,v
retrieving revision 1.247
diff -c -p -r1.247 relcache.c
*** src/backend/utils/cache/relcache.c	31 Jul 2006 20:09:05 -0000	1.247
--- src/backend/utils/cache/relcache.c	17 Aug 2006 03:34:45 -0000
*************** RelationBuildRuleLock(Relation relation)
*** 646,651 ****
--- 646,652 ----
  		rule->event = rewrite_form->ev_type - '0';
  		rule->attrno = rewrite_form->ev_attr;
  		rule->isInstead = rewrite_form->is_instead;
+ 		rule->ev_kind = rewrite_form->ev_kind;
  
  		/*
  		 * Must use heap_getattr to fetch ev_action and ev_qual.  Also,
*************** equalRuleLocks(RuleLock *rlock1, RuleLoc
*** 748,753 ****
--- 749,756 ----
  				return false;
  			if (!equal(rule1->actions, rule2->actions))
  				return false;
+ 			if(rule1->ev_kind != rule2->ev_kind)
+ 				return false;
  		}
  	}
  	else if (rlock2 != NULL)
Index: src/bin/pg_dump/pg_dump.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/bin/pg_dump/pg_dump.c,v
retrieving revision 1.446
diff -c -p -r1.446 pg_dump.c
*** src/bin/pg_dump/pg_dump.c	4 Aug 2006 18:32:15 -0000	1.446
--- src/bin/pg_dump/pg_dump.c	17 Aug 2006 03:34:45 -0000
*************** int			optreset;
*** 51,56 ****
--- 51,57 ----
  #include "catalog/pg_proc.h"
  #include "catalog/pg_trigger.h"
  #include "catalog/pg_type.h"
+ #include "catalog/pg_rewrite.h"
  #include "commands/sequence.h"
  #include "libpq/libpq-fs.h"
  #include "mb/pg_wchar.h"
*************** getRules(int *numRules)
*** 3632,3637 ****
--- 3633,3639 ----
  	int			i_ruletable;
  	int			i_ev_type;
  	int			i_is_instead;
+ 	int			i_ev_kind;
  
  	/* Make sure we are in proper schema */
  	selectSourceSchema("pg_catalog");
*************** getRules(int *numRules)
*** 3640,3646 ****
  	{
  		appendPQExpBuffer(query, "SELECT "
  						  "tableoid, oid, rulename, "
! 						  "ev_class as ruletable, ev_type, is_instead "
  						  "FROM pg_rewrite "
  						  "ORDER BY oid");
  	}
--- 3642,3649 ----
  	{
  		appendPQExpBuffer(query, "SELECT "
  						  "tableoid, oid, rulename, "
! 						  "ev_class as ruletable, ev_type, is_instead, "
! 						  "ev_kind "
  						  "FROM pg_rewrite "
  						  "ORDER BY oid");
  	}
*************** getRules(int *numRules)
*** 3649,3655 ****
  		appendPQExpBuffer(query, "SELECT "
  						  "(SELECT oid FROM pg_class WHERE relname = 'pg_rewrite') AS tableoid, "
  						  "oid, rulename, "
! 						  "ev_class as ruletable, ev_type, is_instead "
  						  "FROM pg_rewrite "
  						  "ORDER BY oid");
  	}
--- 3652,3659 ----
  		appendPQExpBuffer(query, "SELECT "
  						  "(SELECT oid FROM pg_class WHERE relname = 'pg_rewrite') AS tableoid, "
  						  "oid, rulename, "
! 						  "ev_class as ruletable, ev_type, is_instead, "
! 						  "ev_kind "
  						  "FROM pg_rewrite "
  						  "ORDER BY oid");
  	}
*************** getRules(int *numRules)
*** 3669,3674 ****
--- 3673,3679 ----
  	i_ruletable = PQfnumber(res, "ruletable");
  	i_ev_type = PQfnumber(res, "ev_type");
  	i_is_instead = PQfnumber(res, "is_instead");
+ 	i_ev_kind = PQfnumber(res, "ev_kind");
  
  	for (i = 0; i < ntups; i++)
  	{
*************** getRules(int *numRules)
*** 3692,3713 ****
  		ruleinfo[i].dobj.dump = ruleinfo[i].ruletable->dobj.dump;
  		ruleinfo[i].ev_type = *(PQgetvalue(res, i, i_ev_type));
  		ruleinfo[i].is_instead = *(PQgetvalue(res, i, i_is_instead)) == 't';
  		if (ruleinfo[i].ruletable)
  		{
  			/*
! 			 * If the table is a view, force its ON SELECT rule to be sorted
! 			 * before the view itself --- this ensures that any dependencies
! 			 * for the rule affect the table's positioning. Other rules are
! 			 * forced to appear after their table.
  			 */
! 			if (ruleinfo[i].ruletable->relkind == RELKIND_VIEW &&
! 				ruleinfo[i].ev_type == '1' && ruleinfo[i].is_instead)
! 			{
! 				addObjectDependency(&ruleinfo[i].ruletable->dobj,
! 									ruleinfo[i].dobj.dumpId);
! 				/* We'll merge the rule into CREATE VIEW, if possible */
! 				ruleinfo[i].separate = false;
! 			}
  			else
  			{
  				addObjectDependency(&ruleinfo[i].dobj,
--- 3697,3731 ----
  		ruleinfo[i].dobj.dump = ruleinfo[i].ruletable->dobj.dump;
  		ruleinfo[i].ev_type = *(PQgetvalue(res, i, i_ev_type));
  		ruleinfo[i].is_instead = *(PQgetvalue(res, i, i_is_instead)) == 't';
+ 		ruleinfo[i].ev_kind = *(PQgetvalue(res, i, i_ev_kind));
  		if (ruleinfo[i].ruletable)
  		{
  			/*
! 			 * If the table is a view, force its ON SELECT rule to be
! 			 * sorted before the view itself --- this ensures that any
! 			 * dependencies for the rule affect the table's positioning.
! 			 * Other rules are forced to appear after their table.
! 			 *
! 			 * Do the same for implicit rules (for updateable views support).
  			 */
! 			if (ruleinfo[i].ruletable->relkind == RELKIND_VIEW) 
! 				if (ruleinfo[i].ev_type == '1' && ruleinfo[i].is_instead)
! 				{
! 					addObjectDependency(&ruleinfo[i].ruletable->dobj,
! 										ruleinfo[i].dobj.dumpId);
! 					/* We'll merge the rule into CREATE VIEW, if possible */
! 					ruleinfo[i].separate = false;
! 				}
! 				else
! 					if (ruleinfo[i].ev_kind != NO_OPTION_EXPLICIT)
! 					{
! 						addObjectDependency(&ruleinfo[i].ruletable->dobj,
! 											ruleinfo[i].dobj.dumpId);
! 						/* We'll merge the rule into CREATE VIEW, if possible */
! 						ruleinfo[i].separate = false;
! 					}
! 					else
! 						ruleinfo[i].separate = true;
  			else
  			{
  				addObjectDependency(&ruleinfo[i].dobj,
Index: src/bin/pg_dump/pg_dump.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/bin/pg_dump/pg_dump.h,v
retrieving revision 1.128
diff -c -p -r1.128 pg_dump.h
*** src/bin/pg_dump/pg_dump.h	1 Aug 2006 18:05:04 -0000	1.128
--- src/bin/pg_dump/pg_dump.h	17 Aug 2006 03:34:45 -0000
*************** typedef struct _ruleInfo
*** 261,268 ****
  	TableInfo  *ruletable;		/* link to table the rule is for */
  	char		ev_type;
  	bool		is_instead;
  	bool		separate;		/* TRUE if must dump as separate item */
! 	/* separate is always true for non-ON SELECT rules */
  } RuleInfo;
  
  typedef struct _triggerInfo
--- 261,272 ----
  	TableInfo  *ruletable;		/* link to table the rule is for */
  	char		ev_type;
  	bool		is_instead;
+ 	char		ev_kind;
  	bool		separate;		/* TRUE if must dump as separate item */
! 	/* 
! 	 * separate is true for non-ON SELECT rules and 
! 	 * for implicit rules (for updateable views support) 
! 	 */
  } RuleInfo;
  
  typedef struct _triggerInfo
Index: src/bin/pg_dump/pg_dump_sort.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/bin/pg_dump/pg_dump_sort.c,v
retrieving revision 1.15
diff -c -p -r1.15 pg_dump_sort.c
*** src/bin/pg_dump/pg_dump_sort.c	14 Jul 2006 14:52:26 -0000	1.15
--- src/bin/pg_dump/pg_dump_sort.c	17 Aug 2006 03:34:45 -0000
*************** repairDependencyLoop(DumpableObject **lo
*** 816,821 ****
--- 816,862 ----
  		}
  	}
  
+ 	/* View and its implicit rule */
+ 	if (nLoop == 2 &&
+ 		loop[0]->objType == DO_TABLE &&
+ 		loop[1]->objType == DO_RULE &&
+ 		((RuleInfo *) loop[1])->ev_kind != 'e' &&
+ 		((RuleInfo *) loop[1])->ruletable == (TableInfo *) loop[0])
+ 	{
+ 		repairViewRuleLoop(loop[0], loop[1]);
+ 		return;
+ 	}
+ 	if (nLoop == 2 &&
+ 		loop[1]->objType == DO_TABLE &&
+ 		loop[0]->objType == DO_RULE &&
+ 		((RuleInfo *) loop[0])->ev_kind != 'e' &&
+ 		((RuleInfo *) loop[0])->ruletable == (TableInfo *) loop[1])
+ 	{
+ 		repairViewRuleLoop(loop[1], loop[0]);
+ 		return;
+ 	}
+ 
+ 	/* Indirect loop involving view and implicit rule */
+ 	if (nLoop > 2)
+ 	{
+ 		for (i = 0; i < nLoop; i++)
+ 		{
+ 			if (loop[i]->objType == DO_TABLE)
+ 			{
+ 				for (j = 0; j < nLoop; j++)
+ 				{
+ 					if (loop[j]->objType == DO_RULE &&
+ 						((RuleInfo *) loop[j])->ev_kind != 'e' &&
+ 						((RuleInfo *) loop[j])->ruletable == (TableInfo *) loop[i])
+ 					{
+ 						repairViewRuleMultiLoop(loop[i], loop[j]);
+ 						return;
+ 					}
+ 				}
+ 			}
+ 		}
+ 	}
+ 
  	/* Table and CHECK constraint */
  	if (nLoop == 2 &&
  		loop[0]->objType == DO_TABLE &&
Index: src/include/catalog/catversion.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/catalog/catversion.h,v
retrieving revision 1.349
diff -c -p -r1.349 catversion.h
*** src/include/catalog/catversion.h	12 Aug 2006 02:52:06 -0000	1.349
--- src/include/catalog/catversion.h	17 Aug 2006 03:34:45 -0000
***************
*** 53,58 ****
   */
  
  /*							yyyymmddN */
! #define CATALOG_VERSION_NO	200608101
  
  #endif
--- 53,58 ----
   */
  
  /*							yyyymmddN */
! #define CATALOG_VERSION_NO	200608121
  
  #endif
Index: src/include/catalog/pg_proc.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/catalog/pg_proc.h,v
retrieving revision 1.420
diff -c -p -r1.420 pg_proc.h
*** src/include/catalog/pg_proc.h	6 Aug 2006 03:53:44 -0000	1.420
--- src/include/catalog/pg_proc.h	17 Aug 2006 03:34:45 -0000
*************** DATA(insert OID = 2557 ( bool				   PGNS
*** 3841,3846 ****
--- 3841,3847 ----
  DESCR("convert int4 to boolean");
  DATA(insert OID = 2558 ( int4				   PGNSP PGUID 12 f f t f i 1  23 "16" _null_ _null_ _null_ bool_int4 - _null_ ));
  DESCR("convert boolean to int4");
+ /* internal function required for view update rules */
  DATA(insert OID = 2559 ( lastval			   PGNSP PGUID 12 f f t f v 0 20 "" _null_ _null_ _null_	lastval - _null_ ));
  DESCR("current value from last used sequence");
  
*************** DESCR("GiST support");
*** 3894,3899 ****
--- 3895,3904 ----
  DATA(insert OID = 2592 (  gist_circle_compress	PGNSP PGUID 12 f f t f i 1 2281 "2281" _null_ _null_ _null_ gist_circle_compress - _null_ ));
  DESCR("GiST support");
  
+ /* functions for view update facility */
+ DATA(insert OID = 2570 ( pg_view_update_error       PGNSP PGUID 12 f f f f i 1 16 "16" _null_ _null_ _null_ pg_view_update_error - _null_));
+ DESCR("evaluates boolean input for view update check option");
+ 
  /* GIN */
  DATA(insert OID = 2730 (  gingettuple	   PGNSP PGUID 12 f f t f v 2 16 "2281 2281" _null_ _null_ _null_  gingettuple - _null_ ));
  DESCR("gin(internal)");
Index: src/include/catalog/pg_rewrite.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/catalog/pg_rewrite.h,v
retrieving revision 1.26
diff -c -p -r1.26 pg_rewrite.h
*** src/include/catalog/pg_rewrite.h	5 Mar 2006 15:58:55 -0000	1.26
--- src/include/catalog/pg_rewrite.h	17 Aug 2006 03:34:45 -0000
*************** CATALOG(pg_rewrite,2618)
*** 43,48 ****
--- 43,49 ----
  	int2		ev_attr;
  	char		ev_type;
  	bool		is_instead;
+ 	char		ev_kind;
  
  	/* NB: remaining fields must be accessed via heap_getattr */
  	text		ev_qual;
*************** typedef FormData_pg_rewrite *Form_pg_rew
*** 60,72 ****
   *		compiler constants for pg_rewrite
   * ----------------
   */
! #define Natts_pg_rewrite				7
  #define Anum_pg_rewrite_rulename		1
  #define Anum_pg_rewrite_ev_class		2
  #define Anum_pg_rewrite_ev_attr			3
  #define Anum_pg_rewrite_ev_type			4
  #define Anum_pg_rewrite_is_instead		5
! #define Anum_pg_rewrite_ev_qual			6
! #define Anum_pg_rewrite_ev_action		7
  
  #endif   /* PG_REWRITE_H */
--- 61,82 ----
   *		compiler constants for pg_rewrite
   * ----------------
   */
! #define Natts_pg_rewrite				8
  #define Anum_pg_rewrite_rulename		1
  #define Anum_pg_rewrite_ev_class		2
  #define Anum_pg_rewrite_ev_attr			3
  #define Anum_pg_rewrite_ev_type			4
  #define Anum_pg_rewrite_is_instead		5
! #define Anum_pg_rewrite_ev_kind			6
! #define Anum_pg_rewrite_ev_qual			7
! #define Anum_pg_rewrite_ev_action		8
! 
! /*
!  * Possible values for ev_kind 
!  */
! #define LOCAL_OPTION_IMPLICIT    'l' /* WITH LOCAL CHECK OPTION */
! #define CASCADED_OPTION_IMPLICIT 'c' /* WITH CASCADED CHECK OPTION */
! #define NO_OPTION_IMPLICIT       'n' /* no check option specified */
! #define NO_OPTION_EXPLICIT       'e' /* rule was created by user */
  
  #endif   /* PG_REWRITE_H */
Index: src/include/commands/view.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/commands/view.h,v
retrieving revision 1.23
diff -c -p -r1.23 view.h
*** src/include/commands/view.h	5 Mar 2006 15:58:55 -0000	1.23
--- src/include/commands/view.h	17 Aug 2006 03:34:45 -0000
***************
*** 16,22 ****
  
  #include "nodes/parsenodes.h"
  
! extern void DefineView(RangeVar *view, Query *view_parse, bool replace);
  extern void RemoveView(const RangeVar *view, DropBehavior behavior);
  
  #endif   /* VIEW_H */
--- 16,24 ----
  
  #include "nodes/parsenodes.h"
  
! extern void DefineView(RangeVar *view, Query *view_parse, bool replace, 
! 					   bool checkOption, bool cascade);
! 
  extern void RemoveView(const RangeVar *view, DropBehavior behavior);
  
  #endif   /* VIEW_H */
Index: src/include/nodes/parsenodes.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/nodes/parsenodes.h,v
retrieving revision 1.323
diff -c -p -r1.323 parsenodes.h
*** src/include/nodes/parsenodes.h	12 Aug 2006 20:05:56 -0000	1.323
--- src/include/nodes/parsenodes.h	19 Aug 2006 19:07:27 -0000
*************** typedef struct ViewStmt
*** 1677,1682 ****
--- 1677,1683 ----
  	List	   *aliases;		/* target column names */
  	Query	   *query;			/* the SQL statement */
  	bool		replace;		/* replace an existing view? */
+ 	List	   *options;        /* view check options, NIL means no check */
  } ViewStmt;
  
  /* ----------------------
Index: src/include/nodes/relation.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/nodes/relation.h,v
retrieving revision 1.126
diff -c -p -r1.126 relation.h
*** src/include/nodes/relation.h	1 Jul 2006 18:38:33 -0000	1.126
--- src/include/nodes/relation.h	18 Aug 2006 23:29:13 -0000
*************** typedef struct RelOptInfo
*** 260,266 ****
  
  	/* information about a base rel (not set for join rels!) */
  	Index		relid;
! 	RTEKind		rtekind;		/* RELATION, SUBQUERY, or FUNCTION */
  	AttrNumber	min_attr;		/* smallest attrno of rel (often <0) */
  	AttrNumber	max_attr;		/* largest attrno of rel */
  	Relids	   *attr_needed;	/* array indexed [min_attr .. max_attr] */
--- 260,266 ----
  
  	/* information about a base rel (not set for join rels!) */
  	Index		relid;
! 	RTEKind		rtekind;		/* see parsenodes.h */
  	AttrNumber	min_attr;		/* smallest attrno of rel (often <0) */
  	AttrNumber	max_attr;		/* largest attrno of rel */
  	Relids	   *attr_needed;	/* array indexed [min_attr .. max_attr] */
Index: src/include/rewrite/prs2lock.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/rewrite/prs2lock.h,v
retrieving revision 1.21
diff -c -p -r1.21 prs2lock.h
*** src/include/rewrite/prs2lock.h	5 Mar 2006 15:58:58 -0000	1.21
--- src/include/rewrite/prs2lock.h	17 Aug 2006 03:34:45 -0000
*************** typedef struct RewriteRule
*** 29,34 ****
--- 29,35 ----
  	Node	   *qual;
  	List	   *actions;
  	bool		isInstead;
+ 	char        ev_kind;
  } RewriteRule;
  
  /*
Index: src/include/rewrite/rewriteDefine.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/rewrite/rewriteDefine.h,v
retrieving revision 1.21
diff -c -p -r1.21 rewriteDefine.h
*** src/include/rewrite/rewriteDefine.h	5 Mar 2006 15:58:58 -0000	1.21
--- src/include/rewrite/rewriteDefine.h	17 Aug 2006 03:34:45 -0000
***************
*** 16,22 ****
  
  #include "nodes/parsenodes.h"
  
! extern void DefineQueryRewrite(RuleStmt *args);
  
  extern void RenameRewriteRule(Oid owningRel, const char *oldName,
  				  const char *newName);
--- 16,23 ----
  
  #include "nodes/parsenodes.h"
  
! extern void DefineQueryRewrite(RuleStmt *args,
! 							   char     ev_kind);
  
  extern void RenameRewriteRule(Oid owningRel, const char *oldName,
  				  const char *newName);
Index: src/include/rewrite/rewriteRemove.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/rewrite/rewriteRemove.h,v
retrieving revision 1.22
diff -c -p -r1.22 rewriteRemove.h
*** src/include/rewrite/rewriteRemove.h	16 Jun 2006 20:23:45 -0000	1.22
--- src/include/rewrite/rewriteRemove.h	17 Aug 2006 03:34:45 -0000
***************
*** 20,24 ****
--- 20,25 ----
  extern void RemoveRewriteRule(Oid owningRel, const char *ruleName,
  				  DropBehavior behavior, bool missing_ok);
  extern void RemoveRewriteRuleById(Oid ruleOid);
+ extern void deleteImplicitRulesOnEvent(Relation rel, CmdType event_type);
  
  #endif   /* REWRITEREMOVE_H */
Index: src/include/rewrite/rewriteSupport.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/rewrite/rewriteSupport.h,v
retrieving revision 1.28
diff -c -p -r1.28 rewriteSupport.h
*** src/include/rewrite/rewriteSupport.h	5 Mar 2006 15:58:58 -0000	1.28
--- src/include/rewrite/rewriteSupport.h	17 Aug 2006 03:34:45 -0000
***************
*** 17,22 ****
--- 17,33 ----
  /* The ON SELECT rule of a view is always named this: */
  #define ViewSelectRuleName	"_RETURN"
  
+ /*------------------------------------------------------------------------------
+  * some names to be used for implicit view update rules
+  *------------------------------------------------------------------------------
+  */
+ #define INSERTRULENAME "_INSERT"
+ #define DELETERULENAME "_DELETE"
+ #define UPDATERULENAME "_UPDATE"
+ #define NOTHING_INSERTRULENAME "_NOTHING_INSERT"
+ #define NOTHING_UPDATERULENAME "_NOTHING_UPDATE"
+ #define NOTHING_DELETERULENAME "_NOTHING_DELETE"
+ 
  extern bool IsDefinedRewriteRule(Oid owningRel, const char *ruleName);
  
  extern void SetRelationRuleStatus(Oid relationId, bool relHasRules,
/*-------------------------------------------------------------------------
 *
 * viewUpdate.c
 *	  routines for translating an SQL-92-compliant view definition into
 *	  INSERT/UPDATE/DELETE rules (i.e. updatable views).
 *
 * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * ORIGINAL AUTHORS
 * 	Bernd Helmle, Jaime Casanova
 *
 * IDENTIFICATION
 *	  $PostgreSQL$
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "access/heapam.h"
#include "access/xact.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_rewrite.h"
#include "catalog/pg_type.h"
#include "lib/stringinfo.h"
#include "nodes/makefuncs.h"
#include "optimizer/clauses.h"
#include "parser/parse_func.h"
#include "parser/parse_oper.h"
#include "parser/parsetree.h"
#include "rewrite/rewriteDefine.h"
#include "rewrite/rewriteHandler.h"
#include "rewrite/rewriteManip.h"
#include "rewrite/rewriteSupport.h"
#include "rewrite/viewUpdate.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"


typedef TargetEntry** ViewDefColumnList;

typedef struct ViewBaseRelation
{
	List   *defs;			/* XXXX -- what is this? */
	Oid		parentRelation;	/* Oid of parent relation, 0 indicates root */
} ViewBaseRelation;

typedef struct ViewBaseRelationItem
{
	Relation		rel;		/* the Relation itself */
	Query		   *rule;		/* _RETURN rule of a view relation */
	TargetEntry	  **tentries;	/* saves order of column target list */
} ViewBaseRelationItem;

typedef struct ViewExprContext
{
	Index       newRTE;
	Index       oldRTE;
	Index       baseRTE;
	Index       subQueryLevel;
	ViewDefColumnList tentries;
} ViewExprContext;

static const char *const PG_VIEW_UPDATE_CONTROL = "pg_view_update_error";

static Query *get_return_rule(Relation rel);
static void read_rearranged_cols(ViewBaseRelation *tree);
static bool checkTree(const Query *query, ViewBaseRelation *tree);
static Oid get_reloid_from_select(const Query *select,
					   int *rti, RangeTblEntry **rel_entry);
static Query *transform_select_to_update(const Query *update,
						   const Relation rel, const RangeVar *var,
						   TargetEntry **tentries, bool checkOption,
						   bool checkCascade);
static Query *transform_select_to_insert(const Query *select,
						   const Relation rel, const RangeVar *var,
						   TargetEntry **tentries, bool checkOption,
						   bool checkCascade);
static Query *transform_select_to_delete(const Query *delete,
						   const Relation rel, const RangeVar *var,
						   TargetEntry **tentries, bool checkOption,
				  		   bool checkCascade);
static void get_base_relations(ViewBaseRelation *tree, List **baserelations);
static void get_base_base_relations(const Query *view, Oid baserel, List **list);
static void copyReversedTargetEntryPtr(List *targetList,
						   ViewDefColumnList targets);
static bool check_reltree(ViewBaseRelation *node);
static RuleStmt *create_rule_stmt(Query *actions, const RangeVar *view, bool replace,
				 char *rulename, CmdType event);
static RuleStmt *create_nothing_rule(Query *action, const RangeVar *var,
					 char *rulename, CmdType cmdType, bool replace);
static Oid hasRule(Oid view, const char *name);
static bool form_query(const Query *select, Query *query,
		   ViewDefColumnList tentries, bool copyTargetList);
static RangeTblEntry *get_relation_RTE(const Query *select,
				 unsigned int *offset);
static Index get_rtindex_for_rel(List *rte_list,
					const char *relname);
static HeapTuple get_view_qualification_func(const char *name);
static bool view_def_qual_walker(Node *node, ViewExprContext *context);
static FuncExpr *create_qual_for_rule(RuleStmt *stmt, const Query *select,
					 ViewDefColumnList tentries, Index newRTE, Index oldRTE);
static void replace_varnos_for_target(List *targetList,
						  ViewDefColumnList tentries, Index rtIndex);
static bool replace_varnos_for_var(Node *node, Index rtIndex, Index oldIndex,
					   Index subQueryLevel);
static OpExpr *create_opexpr(Var *var_left, Var *var_right);
static void form_where_for_updrule(const Query *select, FromExpr **from,
					   const Relation rel, Oid baserel, Index baserti,
					   Index oldrti);
static void build_update_target_list(const Query *update, const Query *select,
						 ViewDefColumnList tentries, Oid baserel,
						 const Relation rel);
static void form_vars_arrayref(ArrayRef *array, Index newRTE,
				   ViewDefColumnList tentries);
static bool viewIsInsertable(const Query *select, const Relation rel);

/*
 * System function to check CHECK OPTION expression
 */
PG_FUNCTION_INFO_V1(pg_view_update_error);
Datum pg_view_update_error(PG_FUNCTION_ARGS);

/*------------------------------------------------------------------------------
 * Private functions
 * -----------------------------------------------------------------------------
 */

/*
 * Returns the range table index for the specified relname.
 *
 * XXX This seems pretty grotty ... can't we do this in some other way?
 */
static Index
get_rtindex_for_rel(List *rte_list, const char *relname)
{
	ListCell   *cell;
	int			index = 0;

	Assert(relname != NULL);

	foreach(cell, rte_list)
	{
		RangeTblEntry *rte = (RangeTblEntry *) lfirst(cell);

		++index;

		if (rte != NULL)
		{
			if (strncmp(rte->eref->aliasname, relname, NAMEDATALEN) == 0)
				break;
		}
	}

	Assert(index > 0);

	return (Index) index;
}

/*
 * Returns the RangeTblEntry starting at the specified offset. The function can
 * be used to iterate over the rtable list of the specified select query tree.
 * Returns NULL if nothing is found.
 *
 * NOTE: the function only returns those RangeTblEntry that do not match a
 * *NEW* or *OLD* RangeTblEntry.
 *
 * The offset is incremented as a side effect.
 */
static RangeTblEntry *
get_relation_RTE(const Query *select, unsigned int *offset)
{
	AssertArg(offset != NULL);
	AssertArg(select != NULL);

	if ((*offset + 1) > list_length(select->rtable))
		return NULL;

	while (*offset <= list_length(select->rtable))
	{
		RangeTblEntry *rte = rt_fetch(*offset, select->rtable);
		++(*offset);

		/* XXX given the comment on rt_fetch, can this actually happen? */
		if (rte == NULL)
			continue;

		/* skip non-table RTEs */
		if (rte->rtekind != RTE_RELATION)
			continue;

		/*
		 * Skip RTEs named *NEW* and *OLD*.
		 *
		 * XXX It would be nice to be able to use something else than just
		 * the names here ... However, rtekind does not work as expected :-(
		 */
		if (((strncmp(rte->eref->aliasname, "*NEW*", 6) == 0) ||
			 (strncmp(rte->eref->aliasname, "*OLD*", 6) == 0)))
			continue;

		return rte;
	}

	return NULL;
}

/*
 * Replaces the varnos if node is of type Var. The varno is replaced only if
 * oldIndex matches. Set oldIndex to 0 if you want _all_ varnos to be replaced
 * with rtIndex.
 *
 * subQueryLevel gives the outer relation level replace_varnos_for_var is
 * allowed to change, and so only Var's with varlevelsup values that match
 * subQueryLevel are changed.
 *
 * Returns true in case of success.
 */
static bool
replace_varnos_for_var(Node *node, Index rtIndex, Index oldIndex,
					   Index subQueryLevel)
{
	bool result = false;

	Assert(PointerIsValid(node));
	Assert(rtIndex > 0);

	if (IsA(node, Var))
	{
		Var *var = (Var *) node;

		if ((var->varno == oldIndex) && (oldIndex > 0) &&
			(var->varlevelsup == subQueryLevel))
		{
			elog(DEBUG1, "Replacing varno %d to %d in sublevel %d, varattno %d, varlevelsup %d",
				 var->varno,
				 rtIndex,
				 subQueryLevel,
				 var->varattno,
				 var->varlevelsup);

			var->varno = rtIndex;
			var->varnoold = rtIndex;

			result = true;
		}
		else if ((oldIndex == 0) && (var->varlevelsup == subQueryLevel))
		{
			elog(DEBUG1, "Replacing varno %d to %d in sublevel %d, varattno %d, varlevelsup %d",
				 var->varno,
				 rtIndex,
				 subQueryLevel,
				 var->varattno,
				 var->varlevelsup);

			var->varno = rtIndex;
			var->varnoold = rtIndex;

			result = true;
		}
	}

	return result;
}

static void
replace_varattno_for_var(Node *node,
						 ViewDefColumnList tentries,
						 Index subQueryLevel)
{
	Assert((PointerIsValid(node)) && !(tentries == NULL));

	if (IsA(node, Var))
	{
		Var			*var = (Var *) node;
		TargetEntry	*entry;

		entry = tentries[var->varattno - 1];

		if (entry == NULL)
			return;

		if (entry->resno != var->varattno && var->varlevelsup == subQueryLevel)
		{
			var->varattno = entry->resno;
			var->varoattno = entry->resno;
		}
	}
}

/*
 * Walks down the view qualifications node and moves all varnos/varnoold to
 * point to the *NEW* RTE.
 */
static bool
view_def_qual_walker(Node *node,
					 ViewExprContext *context)
{
	if (!PointerIsValid(node))
		return false;

	replace_varnos_for_var(node, context->newRTE,
							0, context->subQueryLevel);
	replace_varattno_for_var(node, context->tentries,
							  context->subQueryLevel);

	switch (node->type)
	{
		case T_CaseExpr:
		case T_NullTest:
			ChangeVarNodes((Node *)node,
						   context->baseRTE,
						   context->newRTE, 0);

			break;
		case T_RelabelType:
			ChangeVarNodes((Node *)((RelabelType *)node)->arg,
						   context->baseRTE, context->newRTE, 0);
			break;

		case T_Query:
			{
				bool result;

				/*
				 * Here comes the part where we dive into the subselect query
				 * tree. we need this to detect any outer relation references
				 * in this subquery tree, because they needs to be rewritten to
				 * be pinned at the *NEW* pseudorelation. Note that this is
				 * necessary only for outer references to *NEW* or *OLD*.
				 *
				 * According to the comments in src/include/nodes/primnodes.h,
				 * we can rely on the fact that the subselect node is a Query *
				 */
				++(context->subQueryLevel);
				result = query_tree_walker((Query *)node,
										   view_def_qual_walker,
										   (void *)context, 0);
				--(context->subQueryLevel);

				return result;
			}

		default:
			break;
	}

	return expression_tree_walker((Node *)node,
								   view_def_qual_walker,
								   (void *)context);
}

static void
form_vars_arrayref(ArrayRef *array, Index newRTE, ViewDefColumnList tentries)
{
	AssertArg(tentries != NULL);
	AssertArg(newRTE > 0);
	AssertArg(array != NULL);

	/* XXX -- this code is duplicated ... */
	if (array->refexpr != NULL)
	{
		if (IsA(array->refexpr, Var))
		{
			Var *var = (Var *)(array->refexpr);
			TargetEntry *entry = tentries[var->varattno - 1];

			/* Only replace if column order is reversed */
			if (entry != NULL)
			{
				if (entry->resno != var->varattno)
				{
					var->varattno = entry->resno;
					var->varoattno = entry->resno;
				}
			}

			/* Finally, make varno point to the *NEW* range table entry */
			var->varno = newRTE;
			var->varnoold = newRTE;
		}
	}

	if (array->refassgnexpr != NULL)
	{
		if (array->refassgnexpr->type == T_Var)
		{
			Var *var = (Var *)(array->refassgnexpr);
			TargetEntry *entry = tentries[var->varattno - 1];

			/*
			 * Only replace if column order is reversed.
			 */
			if (entry != NULL)
			{
				if (entry->resno != var->varattno)
				{
					var->varattno = entry->resno;
					var->varoattno = entry->resno;
				}
			}

			/* Finally, make varno point to the *NEW* range table entry */
			var->varno = newRTE;
			var->varnoold = newRTE;
		}
	}
}

/*
 * Creates an equal operator expression for the specified Vars.  They are
 * assumed to be of the same type.
 */
static OpExpr *
create_opexpr(Var *var_left, Var *var_right)
{
	OpExpr	   *result;
	Operator	tuple;
	Form_pg_operator operator;

	Assert((var_left != NULL) && (var_right != NULL));

	if (var_left->vartype != var_right->vartype)
		elog(ERROR, "left and right var types do not match");

	tuple = equality_oper(var_left->vartype, true);
	if (!HeapTupleIsValid(tuple))
		elog(ERROR,
			 "could not find equal-operator for UPDATE/DELETE WHERE condition");

	operator = (Form_pg_operator) GETSTRUCT(tuple);
	result = makeNode(OpExpr);

	result->opno = HeapTupleGetOid(tuple);
	result->opfuncid = operator->oprcode;
	result->opresulttype = operator->oprresult;
	result->opretset = false;

	result->args = lappend(result->args, var_left);
	result->args = lappend(result->args, var_right);

	ReleaseSysCache(tuple);

	return result;
}

/*
 * Creates an expression tree for a WHERE clause.
 *
 * If from is not NULL, assigns the root node to the specified FromExpr of the
 * target query tree.
 *
 * XXX -- what's the deal with the "anchor" and "op"?
 */
static Node *
build_expression_tree(FromExpr *from, Node **anchor, BoolExpr *expr, OpExpr *op)
{
	/* Already some nodes there?  */

	if (*anchor != NULL)
	{
		expr->args = lappend(expr->args, op);
		((BoolExpr *)(*anchor))->args = lappend(((BoolExpr *)(*anchor))->args,
												expr);
		*anchor = (Node *)expr;
	}
	else
	{
		/* Currently no nodes... */
		BoolExpr *boolexpr = makeNode(BoolExpr);
		expr->args = lappend(expr->args, op);
		boolexpr->args = lappend(boolexpr->args, expr);

		*anchor = (Node *) boolexpr;

		if (from != NULL)
			from->quals = *anchor;
	}

	return *anchor;
}

/*
 * Forms the WHERE clause for DELETE/UPDATE rules targeted to the specified
 * view.
 */
static void
form_where_for_updrule(const Query *select,	/* View retrieve rule */
					   FromExpr **from,		/* FromExpr for stmt */
					   const Relation rel,	/* base relation of view */
					   Oid baserel,			/* Oid of base relation */
					   Index baserti,		/* Index of base relation RTE */
					   Index oldrti)		/* Index of *OLD* RTE */
{
	BoolExpr *expr = NULL;
	Node     *anchor = NULL;
	Form_pg_attribute *attrs = rel->rd_att->attrs;
	ListCell *cell;

	AssertArg(baserti > 0);
	AssertArg(oldrti > 0);
	AssertArg(OidIsValid(baserel));
	AssertArg(*from != NULL);
	AssertArg(rel != NULL);

	foreach(cell, select->targetList)
	{
		TargetEntry *te = (TargetEntry *) lfirst(cell);
		Var		   *var1;
		Var		   *var2;
		OpExpr	   *op;
		CaseExpr   *newcase;
		CaseWhen   *casewhen;
		NullTest   *nulltest1;
		NullTest   *nulltest2;

		/* If te->expr holds no Var pointer, continue ...  */
		if (!IsA(te->expr, Var))
			continue;

		newcase = makeNode(CaseExpr);
		casewhen = makeNode(CaseWhen);
		nulltest1 = makeNode(NullTest);
		nulltest2 = makeNode(NullTest);

		/*
		 * These are the new operands we had to check for equality.
		 *
		 * For DELETE/UPDATE rules, var1 points to the *OLD* RTE, var2
		 * references the base relation.
		 */
		var1 = copyObject((Var *) (te->expr));

		/*
		 * Look at varoattno to determine whether this attribute has a different
		 * location in the underlying base table. If that case, retrieve the
		 * attribute from the base table and assign it to var2; otherwise
		 * simply copy it to var1.
		 */
		if (var1->varoattno > 0)
		{
			var2 = makeNode(Var);

			var2->varno = baserti;
			var2->varnoold = baserti;
			var2->varattno = attrs[var1->varoattno - 1]->attnum;
			var2->vartype = attrs[var1->varoattno - 1]->atttypid;
			var2->vartypmod = attrs[var1->varoattno - 1]->atttypmod;
			var2->varlevelsup = var1->varlevelsup;
			var2->varnoold = var2->varno;
			var2->varoattno = var2->varattno;
		}
		else
		{
			var2 = copyObject(var1);
			var2->varno = baserti;
			var2->varnoold = baserti;
		}

		var1->varno = oldrti;
		var1->varnoold = oldrti;

		/*
		 * rewrite varattno of var2 to point to the right column in relation
		 * *OLD* or *NEW*
		 */
		var2->varattno = te->resorigcol;
		var2->varoattno = te->resorigcol;

		/*
		 * rewrite varattno of var1 to point to the right column in base
		 * relation
		 */
		var1->varattno = te->resno;
		var1->varoattno = te->resno;

		op = create_opexpr(var1, var2);
		expr = makeNode(BoolExpr);
		expr->boolop = AND_EXPR;

		/*
		 * Finally, create the OpExpr node, as part of a CaseExpr and include
		 * the OpExpr as part of the Case for managing NULL's we will do this
		 * everytime.  That way we will have no problem with:
		 *
		 * ALTER TABLE ... ALTER COLUMN ... DROP NOT NULL;
		 */

		nulltest1->arg = (Expr *)var1;
		nulltest1->nulltesttype = IS_NOT_NULL;

		nulltest2->arg = (Expr *)var2;
		nulltest2->nulltesttype = IS_NULL;

		casewhen->expr = (Expr *)nulltest1;
		casewhen->result = (Expr *)op;

		newcase->args = list_make1(casewhen);
		newcase->defresult = (Expr *) nulltest2;

		op = copyObject(newcase);

		anchor = build_expression_tree(*from, (Node **) &anchor, expr, op);
	}
}

/*
 * Replaces the varnos for the specified targetlist to rtIndex
 */
static void
replace_varnos_for_target(List *targetList, ViewDefColumnList tentries,
						  Index rtIndex)
{
	ListCell    *cell;
	TargetEntry *te;

	Assert(targetList != NIL);
	Assert(rtIndex > 0);

	foreach(cell, targetList)
	{
		Node *node = (Node *) lfirst(cell);

		if (IsA(node, TargetEntry))
		{
			te = (TargetEntry *)node;

			if (IsA(te->expr, Var))
			{
				((Var *)(te->expr))->varno = rtIndex;
			}
			else if (IsA(te->expr, ArrayRef))
			{
				ArrayRef *array = (ArrayRef *) (te->expr);

				/*
				 * Things are getting complicated here. We have found an array
				 * subscripting operation. It's necessary to examine all varno's
				 * found in this operation to make sure, we're getting right.  This
				 * covers cases where a view selects a single index or complete
				 * array from a base table or view.
				 */

				/*
				 * Look at expressions that evaluate upper array indexes. Make
				 * sure all varno's are modified.  This is done by walking the
				 * expression tree recursively.
				 */
				if (array->refupperindexpr != NIL)
					replace_varnos_for_target(array->refupperindexpr, tentries,
											  rtIndex);

				/* Same as above, for lower array indexes */
				if (array->reflowerindexpr != NIL)
					replace_varnos_for_target(array->reflowerindexpr, tentries,
											  rtIndex);

				/* Finally look at the expressions themselves */
				if (((array->refexpr != NULL && IsA(array->refexpr, Var)) ||
					(array->refassgnexpr != NULL &&
					 IsA(array->refassgnexpr, Var))))
					form_vars_arrayref(array, rtIndex, tentries);
			}
		}
	}
}

/*
 * Remove all columns from the specified targetList that aren't 'real' table
 * columns. So all system columns, FuncExpr,  etc are removed.  The function
 * returns false, if no more columns are left (that is, the view target list
 * doesn't hold any 'real' columns...)
 *
 * XXX -- the API of this function is really strange ...
 */
static bool
remove_sys_columns(List *targetList, List *sourceList)
{
	ListCell *cell;
	int      r_count = 0;
	int      l_count = list_length(targetList);

	foreach(cell, sourceList)
	{
		Node  *node = (Node *) lfirst(cell);
		Index rt_offset = 0;

		if (IsA(node, TargetEntry))
		{
			TargetEntry *te = (TargetEntry *)node;

			if (IsA(te->expr, Var))
			{
				Var *var = (Var *)te->expr;

				/*
				 * varattno's with negative values indicates system columns.
				 * Ignoring them means removing from the target list...
				 */
				if (var->varattno < 0)
				{
					targetList = list_delete(targetList, node);
					elog(DEBUG1, "system columns aren't updatable");

					++rt_offset;
					++r_count;
				}
			}
			else
			{
#if 0
				/* Commented out in Bernd's patch.  Why? */
				if (IsA(te->expr, ArrayRef))
				{

					targetList = list_delete(targetList, node);
					elog(NOTICE, "array expressions aren't updatable yet");
					++rt_offset;
					++r_count;
				}
#endif
				if (IsA(te->expr, FuncExpr))
				{
					targetList = list_delete(targetList, node);
					elog(DEBUG1, "function expressions aren't updatable");
					++rt_offset;
					++r_count;
				}
			}

			te->resno -= rt_offset;
		}
	}

	if (r_count >= l_count)
		return false;
	else
		return true;
}

/*
 * Adds RTEs to form a query tree.
 *
 * select has to be a valid initialized view definition query tree (the
 * function assumes that this query has passed the checkTree() function).
 *
 * The function returns true if succesful, false otherwise.
 */
static bool
form_query(const Query *select, Query *query, ViewDefColumnList tentries,
		   bool copyTargetList)
{
	RangeTblEntry *rte;
	ListCell   *cell;
	Oid			reloid;

	AssertArg(select != NULL);
	AssertArg(query != NULL);
	AssertArg(tentries != NULL);

	/* Copy the range table entries */
	query->rtable = copyObject(select->rtable);

	/* Prepare other stuff */
	query->canSetTag = true;
	query->jointree = makeNode(FromExpr);

	/*
	 * Those entries in the range table which have the inFromCl flag set must be
	 * changed.
	 */
	foreach(cell, query->rtable)
	{
		Node *node = (Node *) lfirst(cell);

		Assert(IsA(node, RangeTblEntry));

		((RangeTblEntry *)node)->inFromCl = false;
		((RangeTblEntry *)node)->inh = false;
	}

	/*
	 * Set result relation to the base relation.
	 *
	 * Since we currently only support SQL92 views, we simply extract the one
	 * relation which isn't labeled as *OLD* or *NEW*.
	 */
	reloid = get_reloid_from_select(select, &(query->resultRelation), &rte);
	if (!OidIsValid(reloid))
		elog(ERROR, "couldn't retrieve base relation OID");

	Assert(query->resultRelation > 0);

	if (copyTargetList)
	{
		List	   *tempList;

		tempList = copyObject(select->targetList);
		if (!remove_sys_columns(select->targetList, tempList))
		{
			elog(DEBUG1, "no updatable columns in view");
			return false;
		}

		/* Copy all target entries. */
		query->targetList = copyObject(select->targetList);

		/*
		 * Replace all varnos to point to the *NEW* node in all targetentry
		 * expressions.
		 *
		 * XXX -- can this be done with expression_tree_walker?
		 */
		replace_varnos_for_target(query->targetList, tentries, PRS2_NEW_VARNO);
	}

	return true;
}

/*
 * What on earth is this supposed to do?
 *
 * Build a TargetEntry
 */
static void
form_te_for_update(int2 attnum, Form_pg_attribute attrs, Oid baserel,
				   Expr *expr, TargetEntry *te_update)
{
	/*
	 * Try first if this is an array subscripting operation. If true, dive
	 * recursively into the subscripting tree examining all varnos.
	 */

	if (IsA(expr, ArrayRef))
	{
		ArrayRef *array = (ArrayRef *) expr;

		if (array->refassgnexpr != NULL)
		{
			form_te_for_update(attnum, attrs, baserel, array->refassgnexpr,
							   te_update);
		}

		if (array->refupperindexpr != NIL)
		{
			ListCell *cell;

			foreach(cell, array->refupperindexpr)
			{
				Expr *subexpr = (Expr *) lfirst(cell);

				form_te_for_update(attnum, attrs, baserel, subexpr, te_update);
			}
		}

		if (array->reflowerindexpr != NIL)
		{
			ListCell *cell;

			foreach(cell, array->reflowerindexpr)
			{
				Expr *subexpr = (Expr *)lfirst(cell);

				form_te_for_update(attnum, attrs, baserel, subexpr, te_update);
			}
		}

		if (array->refexpr != NULL)
		{
			form_te_for_update(attnum, attrs, baserel, array->refexpr,
							   te_update);
		}
	}
	else if (IsA(expr, Var))
	{
		/*
		 * Base case of recursion: actually build the TargetEntry.
		 */
		Var *upd_var = (Var *) (te_update->expr);

		upd_var->varattno = te_update->resno;
		upd_var->varoattno = te_update->resno;

		upd_var->vartype = attrs->atttypid;
		upd_var->vartypmod = attrs->atttypmod;

		upd_var->varnoold = upd_var->varno;

		te_update->resno = attnum;
		te_update->resname = pstrdup(get_attname(baserel, attnum));
		te_update->ressortgroupref = 0;
		te_update->resorigcol = 0;
		te_update->resorigtbl = 0;
		te_update->resjunk = false;
	}
}

/*
 * Build the target list for a view UPDATE rule.
 */
static void
build_update_target_list(const Query *update, const Query *select,
						 ViewDefColumnList tentries, Oid baserel, Relation rel)
{
	ListCell	   *cell1;
	ListCell	   *cell2;

	Assert(list_length(update->targetList) == list_length(select->targetList));

	forboth(cell1, select->targetList, cell2, update->targetList)
	{
		TargetEntry *entry = (TargetEntry *) lfirst(cell1);
		TargetEntry *upd_entry = (TargetEntry *) lfirst(cell2);
		int			attindex;
		Form_pg_attribute attr;

		if (entry->resorigcol > 0)
		{
			/*
			 * This column seems to have a different order than in the base
			 * table.  We get the attribute from the base relation referenced
			 * by rel and create a new resdom. This new result domain is then
			 * assigned instead of the old one.
			 */
			attindex = entry->resorigcol;
		}
		else
			attindex = entry->resno;

		/*
		 * FIXME -- can't happen if the lists are truly equal in length, but
		 * are they?  (i.e. is the Assert() above really correct?)
		 */
		if (upd_entry == NULL)
			continue;

		attr = rel->rd_att->attrs[attindex - 1];

		form_te_for_update(attindex, attr, baserel, upd_entry->expr,
						   upd_entry);
	}
}

/*
 * Returns the pg_proc tuple for the view definition qualification check
 * function.  Caller must heap_freetuple the result when done with it.
 *
 * An error is raised if the function is not found.
 *
 * XXX -- since the function is fixed, maybe we should consider folding it
 * as a builtin instead.
 */
static HeapTuple
get_view_qualification_func(const char *name)
{
	HeapTuple	procTuple;
	Value	   *funcname = makeString((char *)name);
	Oid			procoid;
	Oid			typid[1];

	Assert(name != NULL);

	typid[0] = BOOLOID;

	procoid = LookupFuncName(list_make1(funcname), 1, typid, false);
	if (!OidIsValid(procoid))
		elog(ERROR, "can't find view qualification check function %s", name);

	elog(DEBUG1, "got OID %u for check function %s", procoid, name);

	procTuple = SearchSysCacheCopy(PROCOID,
								   ObjectIdGetDatum(procoid),
								   0, 0, 0);
	if (!HeapTupleIsValid(procTuple))
		elog(ERROR, "cache lookup failure for function %u", procoid);

	return procTuple;
}

/*
 * Create a rule qualification from the specified view definition query tree.
 *
 * This qualification is copied to the stmt's ev_action query tree and
 * evaluated to check a view definition's qualification. We get a WITH CHECK
 * OPTION behavior by creating a conditional rule here. The view definition
 * qualification is evaluated by the PG_VIEW_UPDATE_CONTROL function.
 *
 * The function returns the function node or null.
 */
static FuncExpr *
create_qual_for_rule(RuleStmt *stmt, const Query *select,
					 ViewDefColumnList tentries, Index newRTE, Index oldRTE)
{
	FuncExpr   *func = makeNode(FuncExpr);
	Index		baseRTE = 0;
	HeapTuple	proc;
	ViewExprContext context;

	Assert(stmt != NULL);
	Assert(list_length(stmt->actions) >= 1);
	Assert(select != NULL);
	Assert((newRTE > 0) && (oldRTE > 0));

	if (select->jointree->quals == NULL)
	{
		/* nothing to do here */
		return NULL;
	}

	/* Retrieve the function. */
	proc = get_view_qualification_func(PG_VIEW_UPDATE_CONTROL);

	/* Create the FuncExpr node for evaluation */
	func->funcid = HeapTupleGetOid(proc);
	func->funcresulttype = ((Form_pg_proc) GETSTRUCT(proc))->prorettype;
	func->funcretset = false;
	func->funcformat = 0;
	func->args = list_make1(copyObject(select->jointree->quals));

	/* Move the view qualification over to the RuleStmt */
	elog(DEBUG1, "previous WHERE clause was %s", nodeToString(stmt->whereClause));
	stmt->whereClause = (Node *)func;

	/*
	 * We have to dig into the WHERE clause and grep out all varnos that have
	 * to point to the *NEW* or *OLD* RTE.
	 */
	baseRTE = 2;
	get_relation_RTE(select, &baseRTE);
	Assert(baseRTE > 2);

	context.newRTE = newRTE;
	context.baseRTE = baseRTE - 1;
	context.oldRTE = oldRTE;
	context.subQueryLevel = 0;
	context.tentries = tentries;

	expression_tree_walker((Node *) func,
							view_def_qual_walker,
							(void *) &context);

	/* free stuff */
	heap_freetuple(proc);

	return func;
}

/* Creates a NOTHING rule */
static RuleStmt *
create_nothing_rule(Query *action, const RangeVar *var, char *rulename,
					CmdType cmdType, bool replace)
{
	return create_rule_stmt(action, var, replace, rulename, cmdType);
}

static RuleStmt *
create_rule_stmt(Query *actions, const RangeVar *view, bool replace,
				 char *rulename, CmdType event)
{
	RuleStmt *rule = makeNode(RuleStmt);

	rule->relation = copyObject((RangeVar *) view);
	rule->rulename = pstrdup(rulename);
	rule->whereClause = NULL;
	rule->event = event;
	rule->instead = true;
	rule->actions = list_make1(actions);
	rule->replace = replace;

	return rule;
}

static Oid
hasRule(Oid view, const char *name)
{
	Assert(OidIsValid(view));

	return GetSysCacheOid(RULERELNAME,
						  ObjectIdGetDatum(view),
						  PointerGetDatum(name),
						  0, 0);
}

/*
 * Check if we can create the insert rule for this view, i.e. if we can create
 * DEFAULT values for all columns that need it.
 *
 * Up to now the only reason I can see to not create the rule is if the view
 * does not contain all not null without default fields of the relation.
 */
static bool
viewIsInsertable(const Query *select, const Relation rel)
{
	int2	natts = RelationGetNumberOfAttributes(rel);
	int		num_sys_cols = 0;
	bool	attr_ok[natts];
	ListCell *cell;
	int		i;

	/* Initialize attr_ok array with false values */
	MemSet(attr_ok, 0, sizeof(attr_ok));

	/*
	 * Loop over the targetlist of the querytree and mark the table attributes
	 * that are present in the view definition.
	 */
	foreach(cell, select->targetList)
	{
		TargetEntry *entry = (TargetEntry *) lfirst(cell);

		if (!IsA(entry->expr, Var) &&
			!IsA(entry->expr, ArrayRef))
		{
			/*
			 * Count non-Var expressions. We need to check if there are
			 * 'normal' table columns left if we gonna to remove all others
			 * with remove_sys_columns()
			 */
			num_sys_cols++;
		}
		else if (entry->resorigcol == 0)
		{
			/* This is not a base rel attribute, it will be ignored. */
			continue;
		}
		else if (entry->resorigcol <= 0)
		{
			/*
			 * If resorigcol < 0 then this is a system attribute and will be
			 * ignored.  It will be removed from the targetlist later.
			 */
			num_sys_cols++;
			continue;
		}

		/*
		 * Mark the attribute as present in the view definition.
		 */
		attr_ok[entry->resorigcol - 1] = true;
	}

	/*
	 * Loop over table attributes to check whether they were all marked in the
	 * previous loop.
	 */
	for (i = 0; i < natts; i++)
	{
		/* Ignore dropped columns */
		if (rel->rd_att->attrs[i]->attisdropped)
			continue;

		/* If the column is in the view definition, accept it */
		if (attr_ok[i])
			continue;

		/*
		 * If there is a NOT NULL attribute which is not in the view definition
		 * and doesn't have a DEFAULT value, then the view is not insertable.
		 */
		if (rel->rd_att->attrs[i]->attnotnull &&
			!rel->rd_att->attrs[i]->atthasdef)
			return false;
	}

	/*
	 * Finally, check if a call to remove_sys_columns() will leave an empty
	 * target list.
	 */
	if (list_length(select->targetList) - num_sys_cols <= 0)
		return false;

	return true;
}

/*
 * Examines the columns by the current view and initializes the lookup table
 * for all rearranged columns in base relations. The function requires a
 * relation tree initialized by get_base_base_relations().
 */
static void
read_rearranged_cols(ViewBaseRelation *tree)
{
	Assert((tree != NULL));

	if (tree->defs != NIL)
	{
		int		num_items = list_length(tree->defs);
		int		i;

		/*
		 * Traverse the relation tree and look on all base relations for
		 * reversed column order in their target lists.  We have to perform a
		 * look-ahead-read on the tree, because we need to know how much
		 * columns the next base relation has, to allocate enough memory in
		 * tentries.
		 *
		 * Note that if we only have one base relation (a "real" table, not a
		 * view) exists, we have nothing to do, because this base relation
		 * cannot have a reversed column order caused by a view definition
		 * query.
		 */
		for (i = (num_items - 1); i > 0; --i)
		{
			ViewBaseRelationItem *item_current;
			ViewBaseRelationItem *item_next;
			ViewBaseRelation *current;
			ViewBaseRelation *next;

			current = (ViewBaseRelation *) list_nth(tree->defs, i);

			/*
			 * We look ahead for the next base relation. We can do this here
			 * safely, because the loop condition terminates before reaching
			 * the list head.
			 */
			next = (ViewBaseRelation *) list_nth(tree->defs, i - 1);

			/*
			 * Note that the code currently requires a SQL92 only relation
			 * tree. This means we handle one base relation per loop, only.
			 */
			Assert(next != NULL);
			Assert(current != NULL);
			Assert(list_length(next->defs) == 1);
			Assert(list_length(current->defs) == 1);

			item_current = (ViewBaseRelationItem *) list_nth(current->defs, 0);
			item_next = (ViewBaseRelationItem *) list_nth(next->defs, 0);

			/* Allocate tentries buffer */
			item_current->tentries = (ViewDefColumnList)
				palloc(sizeof(TargetEntry *) *
					   RelationGetNumberOfAttributes(item_next->rel));

			copyReversedTargetEntryPtr(item_current->rule->targetList,
									   item_current->tentries);
		}
	}
}

/*
 * Returns the base table(s) for the specified relation OID. The result is a
 * list of all possible base table(s) the given view is based on.
 */
static void
get_base_relations(ViewBaseRelation *tree, List **baserelations)
{
	ListCell *acell;

	/* nothing to do?  */
	if (tree == NULL || tree->defs == NIL)
		return;

	foreach(acell, tree->defs)
	{
		ListCell *bcell;
		ViewBaseRelation *relations = (ViewBaseRelation *) lfirst(acell);

		/* current children holds a base table? */
		foreach(bcell, relations->defs)
		{
			ViewBaseRelationItem *item = (ViewBaseRelationItem *) lfirst(bcell);

			if (item->rel->rd_rel->relkind != RELKIND_VIEW)
			{
				elog(DEBUG1, "Found base relation %s",
					 RelationGetRelationName(item->rel));

				*baserelations = lappend(*baserelations, item);
			}
		}
	}
}

/*------------------------------------------------------------------------------
 * Retrieves all relations from the view that can be considered a "base
 * relation".  The function returns a list that holds lists of all relation
 * OIDs found for the view. The list is filled top down, that means the head of
 * the list holds the relations for the "highest" view in the tree.
 *
 * Consider this view definition tree where each node is a relation the above
 * node is based on:
 *
 *                         1
 *                        / \
 *                       2   3
 *                      / \   \
 *                     4   5   6
 *                        /
 *                       7
 *
 * The function will then return a list with the following layout:
 *
 * Listindex          Node(s)
 * --------------------------
 * 1                  7
 * 2                  4 5 6
 * 3                  2 3
 *
 * As you can see in the table, the relations closer to the root are saved as
 * the *last* entries in the list; but the topmost node (the view relation
 * itself) is not saved.
 *------------------------------------------------------------------------------
 */
static void
get_base_base_relations(const Query *view, Oid baserel, List **list)
{
	RangeTblEntry  *entry;
	unsigned int	offset = 1;
	ViewBaseRelation *childRel;

	if (view == NULL)
		return;

	childRel = (ViewBaseRelation *) palloc(sizeof(ViewBaseRelation));
	childRel->defs = NIL;
	childRel->parentRelation = baserel;

	/* Get all OIDs from the RTE list of view. */
	while ((entry = get_relation_RTE(view, &offset)) != NULL)
	{
		Relation	rel;
		ViewBaseRelationItem *item;

		/*
		 * Is this really a view or relation?
		 *
		 * XXX -- maybe we don't need the lock here.  It may be actively
		 * dangerous, if somewhere else we acquire a stronger lock later ...
		 */
		rel = relation_open(entry->relid, AccessShareLock);

		if (rel->rd_rel->relkind != RELKIND_RELATION &&
			rel->rd_rel->relkind != RELKIND_VIEW)
		{
			/* don't need this one */
			relation_close(rel, AccessShareLock);
			continue;
		}

		item = (ViewBaseRelationItem *) palloc0(sizeof(ViewBaseRelationItem));
		item->rel = rel;

		if (rel->rd_rel->relkind == RELKIND_VIEW)
		{
			/*
			 * Get the rule _RETURN expression tree for the specified relation
			 * OID.  We need this to recurse into the view base relation tree.
			 */
			item->rule = get_return_rule(rel);

			/* recurse to child relations */
			if (item->rule != NULL)
				get_base_base_relations(item->rule, RelationGetRelid(rel), list);
		}

		childRel->defs = lappend(childRel->defs, item);

		elog(DEBUG1, "extracted relation %s for relation tree",
			 RelationGetRelationName(rel));
	}

	if (childRel->defs != NIL)
		*list = lappend(*list, childRel);
}

static void
copyReversedTargetEntryPtr(List *targetList, ViewDefColumnList targets)
{
	ListCell *cell;

	AssertArg(targets != NULL);
	AssertArg(targetList != NIL);

	/* NOTE: we only reassign pointers.  */
	foreach(cell, targetList)
	{
		Node *node = (Node *) lfirst(cell);

		if (IsA(node, TargetEntry))
		{
			/*
			 * Look at the resdom's resorigcol to determine whether this is a
			 * reversed column (meaning, it has a different column number than
			 * the underlying base table).
			 */
			TargetEntry *entry = (TargetEntry *) node;

			if (!IsA(entry->expr, Var))
				/* nothing to do here */
				continue;

			if ((entry->resorigcol > 0) &&
				(entry->resno != entry->resorigcol))
			{
				/*
				 * Save this TargetEntry to the appropiate place in the lookup
				 * table.  Do it only if not already occupied (this could
				 * happen if the column is specified more than one time in the
				 * view definition).
				 */
				if (targets[entry->resorigcol - 1] == NULL)
					targets[entry->resorigcol - 1] = entry;
			}
		}
	}
}

/*
 * Transforms the specified view definition into a DELETE rule.
 *
 * Note: The function assumes that the specified query tree has passed the
 * checkTree() function.
 */
static Query *
transform_select_to_delete(const Query *select, const Relation rel,
						   const RangeVar *var, ViewDefColumnList tentries,
						   bool checkOption, bool checkCascade)
{
	Query	   *delete;
	RuleStmt   *nothing;
	RuleStmt   *rule;
	Oid			baserel;
	bool		replace_nothing;
	bool		replace_delete;
	Index		baserti;
	Query	   *gen_delete;
	char		option;
	RangeTblRef *oldref;
	RangeTblRef *baseref;
	RangeTblEntry *entry;
	RangeTblEntry *viewrte;
	

	AssertArg(tentries != NULL);
	AssertArg(rel != NULL);
	AssertArg(var != NULL);
	AssertArg(select != NULL);

	baserel = get_reloid_from_select(select, NULL, &entry);
	if (!OidIsValid(baserel))
		elog(ERROR, "could not get the base relation from the view definition");

	delete = makeNode(Query);
	delete->commandType = CMD_DELETE;

	/* We don't need a targetlist in DELETE */
	if (!form_query(select, delete, tentries, false))
	{
		elog(DEBUG1, "could not create DELETE rule");
		return NULL;
	}

	/*
	 * Pass this generic tree to create_nothing_rule(), it's sufficient to
	 * create a INSTEAD DO NOTHING rule for deletion.
	 *
	 * This is required, because we can get conditional rules if the
	 * checkOption is specified.  We only save this generic tree here, the
	 * work is done at the end of this function.
	 *
	 * FIXME -- can we do this only if the check option is specified?
	 */

	gen_delete = copyObject(delete);
	gen_delete->commandType = CMD_NOTHING;

	/*
	 * form_query() has prepared the jointree of the new DELETE rule.
	 * However, a DELETE rule needs range table references to *OLD* and
	 * base relation RTEs.
	 */

	baserti = get_rtindex_for_rel(delete->rtable,
								  entry->eref->aliasname);
	Assert(baserti > 0);

	oldref = makeNode(RangeTblRef);
	oldref->rtindex = PRS2_OLD_VARNO;

	baseref = makeNode(RangeTblRef);
	baseref->rtindex = baserti;

	delete->jointree->fromlist = list_make2(baseref, oldref);

	/* Create the WHERE condition qualification for the rule action. */
	form_where_for_updrule(select, &(delete->jointree),
						   rel, baserel, baserti, PRS2_OLD_VARNO);

	/*
	 * Get the relation name for the view  XXX -- uh???
	 *
	 * XXX -- Is it safe to rely on *NEW* here?
	 */
	viewrte = rt_fetch(PRS2_NEW_VARNO, delete->rtable);

	/* Set ACL bit */
	entry->requiredPerms |= ACL_DELETE;

	/*
	 * XXX It seems to me that these checks are not necessary; and further,
	 * they are useless.  This is because the view is just being created,
	 * thus it cannot have any rules before the ones we are going to create.
	 */
	/* DELETE rule has to be replaced? */
	replace_delete = OidIsValid(hasRule(viewrte->relid, DELETERULENAME));

	/* NOTHING rule has to be replaced? */
	replace_nothing = OidIsValid(hasRule(viewrte->relid,
										 NOTHING_DELETERULENAME));

	nothing = create_nothing_rule(gen_delete, var,
								  NOTHING_DELETERULENAME,
								  CMD_DELETE, replace_nothing);

	rule = create_rule_stmt(delete, var, replace_delete, DELETERULENAME,
							CMD_DELETE);

	/*
	 * Enter rule qualification creation. The DELETE rule has to reference
	 * the *OLD* range table entry.
	 */
	if (checkOption)
		create_qual_for_rule(rule, select, tentries, PRS2_OLD_VARNO, baserti);

	option = makeViewCheckOption(checkOption, checkCascade);

	DefineQueryRewrite(nothing, NO_OPTION_IMPLICIT);
	DefineQueryRewrite(rule, option);

	return delete;
}

/*
 * Transforms the specified SELECT query tree into an equivalent UPDATE
 * statement.
 *
 * Note: The function assumes that the specified query tree has passed the
 * checkTree() function before.
 */
static Query *
transform_select_to_update(const Query *select, const Relation rel,
						   const RangeVar *var, ViewDefColumnList tentries,
						   bool checkOption, bool checkCascade)
{
	Query	   *update;
	Oid			baserel;
	RuleStmt   *nothing;
	RuleStmt   *rule;
	bool		replace_update;
	bool		replace_nothing;
	Index		baserti;
	Query	   *gen_update;
	char		option;
	RangeTblRef *oldref;
	RangeTblRef *baseref;
	RangeTblEntry *entry;
	RangeTblEntry *viewrte;

	AssertArg(tentries != NULL);
	AssertArg(rel != NULL);
	AssertArg(var != NULL);
	AssertArg(select != NULL);

	baserel = get_reloid_from_select(select, NULL, &entry);
	if (!OidIsValid(baserel))
		elog(ERROR, "could not get the base relation from the view definition");

	update = makeNode(Query);
	update->commandType = CMD_UPDATE;

	if (!form_query(select, update, tentries, true))
	{
		elog(DEBUG1, "could not create UPDATE update rule");
		return NULL;
	}

	/*
	 * Our UPDATE rule needs range table references for the *NEW* and base
	 * relation.
	 *
	 * Note: the jointree in the UPDATE tree is already prepared, we only
	 * had to fill the fromlist list.
	 */

	baserti = get_rtindex_for_rel(update->rtable,
								  entry->eref->aliasname);

	oldref = makeNode(RangeTblRef);
	baseref = makeNode(RangeTblRef);

	oldref->rtindex = PRS2_OLD_VARNO;
	baseref->rtindex = baserti;

	update->jointree->fromlist = list_make2(baseref, oldref);

	/* Create the WHERE condition qualification for the new rule */
	form_where_for_updrule(select, &(update->jointree),
						   rel, baserel, baserti, PRS2_OLD_VARNO);

	/*
	 * We must reorder the columns in the targetlist to match the
	 * underlying table. We do this after calling form_where_for_updrule()
	 * because build_update_target_list() relies on the original resdoms in
	 * the update tree.
	 */
	build_update_target_list(update, select, tentries, baserel, rel);

	/*
	 * Pass this generic tree to create_nothing_rule(), it's sufficient to
	 * create an INSTEAD DO NOTHING rule for deletion.
	 *
	 * This is required, because we can get conditional rules if the
	 * checkOption is specified.  We only save this generic tree here, the
	 * work is done at the end of this function.
	 */
	gen_update = copyObject(update);
	gen_update->commandType = CMD_NOTHING;

	/* Get the relation name for the view */
	viewrte = rt_fetch(PRS2_NEW_VARNO, update->rtable);

	/* Set ACL bit */
	entry->requiredPerms |= ACL_UPDATE;

	/* UPDATE rule has to be replaced? */
	replace_update = OidIsValid(hasRule(viewrte->relid, UPDATERULENAME));

	/* NOTHING rule has to be replaced? */
	replace_nothing = OidIsValid(hasRule(viewrte->relid,
										 NOTHING_UPDATERULENAME));

	/* Ready to create the rule */

	nothing = create_nothing_rule(gen_update, var,
								  NOTHING_UPDATERULENAME,
								  CMD_UPDATE, replace_nothing);

	rule = create_rule_stmt(update, var, replace_update, UPDATERULENAME,
							CMD_UPDATE);

	/* enter rule qualification creation */
	if (checkOption)
		create_qual_for_rule(rule, select, tentries, PRS2_NEW_VARNO, baserti);

	option = makeViewCheckOption(checkOption, checkCascade);

	DefineQueryRewrite(nothing, NO_OPTION_IMPLICIT);
	DefineQueryRewrite(rule, option);

	return update;
}

/*
 * Transforms the specified SELECT query tree into an equivalent INSERT
 * statement.
 *
 * Note: The function assumes that the specified query tree has passed the
 * checkTree() function before.
 */
static Query *
transform_select_to_insert(const Query *select, const Relation rel,
						   const RangeVar *var, ViewDefColumnList tentries,
						   bool checkOption, bool checkCascade)
{
	Query	   *insert;
	Oid			baserel;
	Query	   *gen_insert;
	Index		baserti;
	char		option;
	bool		replace_insert;
	RuleStmt   *rule;
	bool		replace_nothing;
	RuleStmt   *nothing;
	RangeTblEntry *viewrte;
	RangeTblEntry *entry;

	AssertArg(select != NULL);
	AssertArg(rel != NULL);
	AssertArg(var != NULL);
	AssertArg(tentries != NULL);

	baserel = get_reloid_from_select(select, NULL, &entry);
	if (!OidIsValid(baserel))
		elog(ERROR, "could not get the base relation from the view definition");

	option = makeViewCheckOption(checkOption, checkCascade);

	/* Check if we can create the insert rule for this view. */
	if (!viewIsInsertable(select, rel))
	{
		elog(DEBUG1, "Cannot create implicit insert rule for this view");
		return NULL;
	}

	insert = makeNode(Query);
	insert->commandType = CMD_INSERT;

	if (!form_query(select, insert, tentries, true))
	{
		elog(DEBUG1, "could not create INSERT update rule");
		return NULL;
	}

	/*
	 * Pass this generic tree to create_nothing_rule(), it's sufficient to
	 * create a INSTEAD DO NOTHING rule.  The INSTEAD DO NOTHING rule is
	 * required, because we can get conditional rules if the checkOption is
	 * specified.
	 *
	 * XXX -- but actually it isn't required if there's no check option.  Try
	 * to suppress generating it unnecesarily.
	 */

	gen_insert = copyObject(insert);
	gen_insert->commandType = CMD_NOTHING;

	baserti = get_rtindex_for_rel(insert->rtable,
								  entry->eref->aliasname);
	viewrte = rt_fetch(PRS2_NEW_VARNO, insert->rtable);

	/*
	 * We must reorder the columns in the targetlist to match the underlying
	 * table.
	 */
	build_update_target_list(insert, select, tentries, baserel, rel);

	/* Set the needed ACL bit */
	entry->requiredPerms |= ACL_INSERT;

	/* INSERT rule has to be replaced? */
	replace_insert = OidIsValid(hasRule(viewrte->relid, INSERTRULENAME));

	/* NOTHING rule has to be replaced? */
	replace_nothing = OidIsValid(hasRule(viewrte->relid,
										 NOTHING_INSERTRULENAME));

	/* Ready to create the rules */
	nothing = create_nothing_rule(gen_insert, var,
								  NOTHING_INSERTRULENAME,
								  CMD_INSERT, replace_nothing);
	rule = create_rule_stmt(insert, var, replace_insert, INSERTRULENAME,
							CMD_INSERT);

	/*
	 * enter rule qualification creation
	 */
	if (checkOption)
		create_qual_for_rule(rule, select, tentries, PRS2_NEW_VARNO, baserti);

	DefineQueryRewrite(nothing, NO_OPTION_IMPLICIT);
	DefineQueryRewrite(rule, option);

	return insert;
}

/*
 * Checks the specified Query for SQL92 compliance.
 *
 * Returns FALSE if the specified SELECT query tree does not pass the SQL92
 * requirements, otherwise TRUE is returned.
 *
 */
static bool
checkTree(const Query *query, ViewBaseRelation *tree)
{
	ListCell *cell;

	AssertArg(query != NULL);
	AssertArg(tree != NULL);

	if (!check_reltree(tree))
	{
		elog(DEBUG1,
			 "possible JOIN/UNION in relations found in view definition");
		return false;
	}

	/* XXX ISTM this shouldn't happen ... maybe turn it into an Assert()? */
	if (query->commandType != CMD_SELECT)
	{
		elog(ERROR, "query is not a SELECT");
		return false;
	}

	/* Check for unsupported conditions in the view definition */
	if (query->hasAggs)
	{
		elog(DEBUG1, "aggregates are not supported in updatable views");
		return false;
	}

	if (query->hasSubLinks)
	{
		elog(WARNING, "subqueries are not supported in updatable views");
		return false;
	}

	if (list_length(query->groupClause) >= 1)
	{
		elog(DEBUG1, "GROUP BY violates SQL92 view update rules");
		return false;
	}

	if (list_length(query->distinctClause) >= 1)
	{
		elog(DEBUG1, "DISTINCT violates SQL92 view update rules");
		return false;
	}

	if (query->havingQual != NULL)
	{
		elog(DEBUG1, "HAVING violates SQL92 view update rules");
		return false;
	}

	/*
	 * Test for number of involved relations. Since we assume to operate on a
	 * view definition SELECT query tree, we must count 3 rtable entries.
	 * Otherwise this seems not to be a view based on a single relation.
	 */
	if (list_length(query->rtable) > 3)
	{
		elog(DEBUG1, "view seems to have more than one base relation");
		return false;
	}

	/* Any rtable entries involved?  */
	if (list_length(query->rtable) < 3)
	{
		elog(DEBUG1, "no base relation detected");
		return false;
	}

	/*
	 * Walk down the target list and look for nodes that aren't Vars. SQL92
	 * doesn't allow functions, hostvariables or constant expressions in the
	 * target list.
	 *
	 * Also, check if any of the target list entries are indexed array
	 * expressions, which aren't supported.
	 */
	foreach(cell, query->targetList)
	{
		Node *node = (Node *) lfirst(cell);

		if (IsA(node, TargetEntry))
		{
			TargetEntry	   *te = (TargetEntry *) node;

			/*
			 * TODO -- it would be nice to support Const nodes here as well
			 * (but apparently it isn't in the standard)
			 */
			if (!IsA(te->expr, Var) &&
				!IsA(te->expr, ArrayRef))
			{
				elog(DEBUG1, "view target list has unsupported column entries");
				return false;
			}

			/* This is an implementation shortcoming */
			if (IsA(te->expr, ArrayRef))
			{
				ArrayRef *ref = (ArrayRef *) te->expr;

				if (ref->refupperindexpr != NIL)
				{
					elog(DEBUG1, "indexed array fields aren't updatable");
					return false;
				}
			}
		}
	}

	/*
	 * Finally, check that all RTEs are valid.  We have to look especially for
	 * table functions, which cannot be ever updatable.
	 */
	foreach(cell, query->rtable)
	{
		RangeTblEntry *entry = (RangeTblEntry *) lfirst(cell);

		if (entry->rtekind != RTE_RELATION)
		{
			elog(DEBUG1, "range table entry %s is not a relation!",
				 entry->eref->aliasname);
			return false;
		}
	}

	return true;
}

/*
 * Traverse the specified relation tree.  The function stops at the base
 * relations at the leafs of the tree. If any of the relations has more than
 * one base relations, it is considered as a non-SQL92 updatable view and FALSE
 * is returned.
 *
 */
static bool
check_reltree(ViewBaseRelation *node)
{
	ListCell *cell;

	Assert(node != NULL);

	foreach(cell, node->defs)
	{
		/* Walk down the tree */
		ViewBaseRelation *relations = (ViewBaseRelation *) lfirst(cell);

		if (list_length(relations->defs) > 1)
			return false;
	}

	return true;
}

/*
 * Given a SELECT query tree, return the OID of the first RTE_RELATION range
 * table entry found that is not *NEW* nor *OLD*.
 *
 * Also copies the RangeTblEntry into rel_entry, and the range table index
 * into rti, unless they are NULL.
 *
 * This function assumes that the specified query tree was checked by a
 * previous call to the checkTree() function.
 */
static Oid
get_reloid_from_select(const Query *select, int *rti, RangeTblEntry **rel_entry)
{
	ListCell   *cell;
	Oid			result = InvalidOid;
	int			index;

	/* Check specified query tree. Return immediately on error. */
	if (select == NULL || select->commandType != CMD_SELECT)
		return InvalidOid;

	/*
	 * We loop through the RTEs to get information about all involved
	 * relations.  We return the first OID we find in the list that is not
	 * *NEW* nor *OLD*.
	 */
	index = 0;
	foreach(cell, select->rtable)
	{
		RangeTblEntry *entry = (RangeTblEntry *) lfirst(cell);

		index++;

		if (entry == NULL)
			elog(ERROR, "null RTE pointer in get_reloid_from_select");

		elog(DEBUG1, "extracted range table entry for %u", entry->relid);

		/* Return the first RELATION rte we find */
		if (entry->rtekind == RTE_RELATION)
		{
			/*
			 * XXX This is ugly.  The parser prepends two RTEs with rtekind
			 * RTE_RELATION named *NEW* and *OLD*.  We have to exclude them by
			 * name!  It would be much better if it used RTE_SPECIAL
			 * instead, but other parts of the system stop working if one
			 * just changes it naively.
			 */
			if (strncmp(entry->eref->aliasname, "*NEW*", 6) == 0 ||
				strncmp(entry->eref->aliasname, "*OLD*", 6) == 0)
				continue;

			result = entry->relid;
			if (rti != NULL)
				*rti = index;
			if (rel_entry != NULL)
				*rel_entry = copyObject(entry);
			break;
		}
	}

	return result;
}

/*
 * get_return_rule: returns the _RETURN rule of a view as a Query node.
 */
static Query *
get_return_rule(Relation rel)
{
	Query  *query = NULL;
	int		i;

	Assert(rel->rd_rel->relkind == RELKIND_VIEW);

	for (i = 0; i < rel->rd_rules->numLocks; i++)
	{
		RewriteRule *rule = rel->rd_rules->rules[i];

		if (rule->event == CMD_SELECT)
		{
			/* A _RETURN rule should have only one action */
			if (list_length(rule->actions) != 1)
				elog(ERROR, "invalid _RETURN rule action specification");

			query = linitial(rule->actions);
			break;
		}
	}

	return query;
}

/*------------------------------------------------------------------------------
 * Public functions
 *------------------------------------------------------------------------------
 */

/*
 * CreateViewUpdateRules
 *
 * This is the main entry point to creating an updatable view's rules.  Given a
 * rule definition, examine it, and create the rules if appropiate, or return
 * doing nothing if not.
 */
void
CreateViewUpdateRules(const Query *viewDef, const RangeVar *var,
					  bool checkOption, bool cascade)
{
	Relation	rel;
	Form_pg_attribute *attrs;
	ViewDefColumnList tentries;
	Oid			baserel;
	MemoryContext	cxt;
	MemoryContext	oldcxt;
	ViewBaseRelation *tree;
	List	   *baserelations;
	ListCell   *cell;
	
	/*
	 * The routines in this file leak memory like crazy, so make sure we
	 * allocate it all in an appropiate context.
	 */
	cxt = AllocSetContextCreate(TopTransactionContext,
								"UpdateRulesContext",
								ALLOCSET_DEFAULT_MINSIZE,
								ALLOCSET_DEFAULT_INITSIZE,
								ALLOCSET_DEFAULT_MAXSIZE);
	oldcxt = MemoryContextSwitchTo(cxt);

	/*
	 * Create the lookup table for the view definition target columns. We save
	 * the RESDOMS in that manner to look quickly for reversed column orders.
	 */

	baserel = get_reloid_from_select(viewDef, NULL, NULL);

	/* Get relation tree */
	tree = (ViewBaseRelation *) palloc(sizeof(ViewBaseRelation));

	tree->parentRelation = InvalidOid;
	tree->defs = NIL;
	get_base_base_relations(viewDef, baserel, &(tree->defs));

	baserelations = NIL;
	get_base_relations(tree, &baserelations);

	/* Check the query tree for SQL92 compliance */
	if (!checkTree(viewDef, tree))
	{
		elog(DEBUG1, "view is not updatable");
		goto finish;
	}

	rel = heap_open(baserel, AccessExclusiveLock);
	attrs = rel->rd_att->attrs;

	/*
	 * Copy TargetEntries to match the slot numbers in the target list with
	 * their original column attribute number. Note that only pointers are
	 * copied and they are valid only as long as the specified SELECT query
	 * stays valid!
	 */
	tentries = (ViewDefColumnList)
		palloc0(rel->rd_rel->relnatts * sizeof(TargetEntry *));

	copyReversedTargetEntryPtr(viewDef->targetList, tentries);
	
	/*
	 * Now do the same for the base relation tree. read_rearranged_cols
	 * traverses the relation tree and performs a copyReversedTargetEntry()
	 * call to each base relation.
	 */
	read_rearranged_cols(tree);

	/*
	 * Create update rules.  Need a CCI here to see previously-created
	 * rules.  XXX Maybe this CCI should be elsewhere.
	 */
	CommandCounterIncrement();
	
	elog(NOTICE, "CREATE VIEW will create implicit INSERT/UPDATE/DELETE rules");
	
	transform_select_to_insert(viewDef, rel, var, tentries, checkOption,
							   cascade);

	transform_select_to_delete(viewDef, rel, var, tentries, checkOption,
							   cascade);

	transform_select_to_update(viewDef, rel, var, tentries, checkOption,
							   cascade);

	/* free remaining stuff */
	heap_close(rel, AccessExclusiveLock);

finish:
	/* get_base_base_relations leaves some open relations */
	foreach(cell, tree->defs)
	{
		ListCell   *cell2;
		ViewBaseRelation *vbr = (ViewBaseRelation *) lfirst(cell);

		foreach(cell2, vbr->defs)
		{
			ViewBaseRelationItem *vbri = (ViewBaseRelationItem *) lfirst(cell2);

			/* XXX should we keep the locks here? */
			relation_close(vbri->rel, AccessShareLock);
		}
	}

	MemoryContextSwitchTo(oldcxt);
	MemoryContextDelete(cxt);
}

char
makeViewCheckOption(bool check, bool cascaded)
{
	char result = NO_OPTION_IMPLICIT;

	if (check)
	{
		if (cascaded)
			result = CASCADED_OPTION_IMPLICIT;
		else
			result = LOCAL_OPTION_IMPLICIT;
	}

	return result;
}


/*------------------------------------------------------------------------------
 * Required for update rule CHECK OPTION
 *------------------------------------------------------------------------------
 */
Datum
pg_view_update_error(PG_FUNCTION_ARGS)
{
  bool qual = PG_GETARG_BOOL(0);

  if (PG_ARGISNULL(0))
  {
	  /*
	   * A view's CHECK OPTION should never evaluate to NULL,
	   * so we treat a NULL evaluated expression tree as false.
	   */
	  qual = false;
  }

  if (!qual)
	  ereport(ERROR,
			  (errcode(ERRCODE_WITH_CHECK_OPTION_VIOLATION),
			   errmsg("view update commands violates rule condition")));

  PG_RETURN_BOOL(true);
}
/*-------------------------------------------------------------------------
 *
 * viewUpdate.h
 *
 * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * $PostgreSQL$
 *
 *-------------------------------------------------------------------------
 */
#ifndef VIEW_UPDATE_H
#define VIEW_UPDATE_H

#include "nodes/parsenodes.h"
#include "nodes/primnodes.h"

extern void
CreateViewUpdateRules(const Query *viewDef, const RangeVar *var, bool checkOption,
					  bool cascade);
extern char
makeViewCheckOption(bool check, bool cascaded);

#endif /* VIEW_UPDATE_H */
CREATE TABLE upd_first (id serial PRIMARY KEY, name text);
INSERT INTO upd_first (name) VALUES ('weenie');
INSERT INTO upd_first (name) VALUES ('meenie');

-- basic functionality: I/U/D on a trivial view
CREATE VIEW v_first AS SELECT * FROM upd_first;
SELECT * FROM v_first;
INSERT INTO v_first VALUES (3, 'meenie');
DELETE FROM v_first WHERE id = 2;
UPDATE v_first SET name = 'eenie' WHERE id = 1;
SELECT * FROM v_first;

-- fail because view doesn't have the DEFAULT clause
INSERT INTO v_first (name) VALUES ('phooey');

-- so set it
ALTER TABLE v_first ALTER id
SET DEFAULT nextval(pg_get_serial_sequence('upd_first', 'id'));

-- try again ... this one fails because the sequence got left behind
INSERT INTO v_first (name) VALUES ('doobie');
-- but this one should work
INSERT INTO v_first (name) VALUES ('minie');
SELECT * FROM v_first;

-- try a view on a view, with columns reversed
CREATE VIEW v_v_first as SELECT name, id FROM v_first;
SELECT * FROM v_v_first;
ALTER TABLE v_v_first ALTER id
SET DEFAULT nextval(pg_get_serial_sequence('upd_first', 'id'));
INSERT INTO v_v_first VALUES ('tangerine', DEFAULT);

-- Let's try an "horizontally-constrained" view of the table.  We can't trust
-- the real CURRENT_USER to return a constant, so fake it.
create function fake_curr_usr() RETURNS name
AS $$ SELECT 'regression'::name $$ LANGUAGE sql;

ALTER TABLE upd_first ADD COLUMN usr name DEFAULT fake_curr_usr();

CREATE VIEW v_second AS SELECT * FROM upd_first WHERE usr = fake_curr_usr();
ALTER TABLE v_second ALTER id
SET DEFAULT nextval(pg_get_serial_sequence('upd_first', 'id'));

-- this one works because the view was not created with check option
INSERT INTO v_second (name, usr) VALUES ('doe', 'nobody');

-- but let's see what happens with one?
CREATE VIEW v_third AS SELECT * FROM upd_first WHERE usr = fake_curr_usr()
WITH CHECK OPTION;
ALTER TABLE v_third ALTER id
SET DEFAULT nextval(pg_get_serial_sequence('upd_first', 'id'));
-- these fail: violation of the view condition
INSERT INTO v_third (name, usr) VALUES ('viper', 'nobody');
UPDATE v_third set name = 'piggy', usr = 'usurper' WHERE id = 3;
-- these work
INSERT INTO v_third (name, usr) VALUES ('tiger', fake_curr_usr());
UPDATE v_third SET name = 'moe' WHERE id = 5;

-- now try a vertically-constrained view, i.e. not all columns
CREATE VIEW v_fourth as SELECT id, name FROM upd_first;
-- fails because id doesn't have a default
INSERT INTO v_fourth (name) VALUES ('on');
ALTER TABLE v_fourth ALTER id
SET DEFAULT nextval(pg_get_serial_sequence('upd_first', 'id'));
-- fails because view doesn't have column usr
INSERT INTO v_fourth (id, name, usr) VALUES (default, 'FROM', 'gross');
-- works, with defaults for id and usr
INSERT INTO v_fourth (name) VALUES ('by');
UPDATE v_fourth set name = 'catcha' WHERE id = 6;

-- no UPDATE rules for a view on a join (even if there's only one base table)
CREATE VIEW v_fifth as SELECT id, v_first.name, usr
FROM v_first JOIN v_second USING (id);
INSERT INTO v_fifth DEFAULT VALUES;

-- no rules for a view on a query with two FROM items
CREATE VIEW v_sixth AS SELECT v_first.id, v_first.name, usr
FROM v_first, v_second
WHERE v_first.id = v_second.id;
INSERT INTO v_sixth DEFAULT VALUES;

-- test WITH LOCAL CHECK OPTION
CREATE VIEW v_seventh AS SELECT * FROM upd_first WHERE usr = fake_curr_usr() WITH CHECK OPTION;
CREATE VIEW v_eighth AS SELECT * FROM v_seventh WHERE id > 4 WITH LOCAL CHECK OPTION;
ALTER TABLE v_eighth ALTER id
SET DEFAULT nextval(pg_get_serial_sequence('upd_first', 'id'));

INSERT INTO v_eighth (name, usr) VALUES ('jut', fake_curr_usr());

-- we can't violate v_eighth condition
INSERT INTO v_eighth (id) values (2);
UPDATE v_eighth SET id = 2 WHERE name = 'jut';
-- but we can violate v_seventh's
UPDATE v_eighth SET usr = 'someone' WHERE name = 'jut';
INSERT INTO v_eighth (name, usr) VALUES ('kvu', 'anyone');

-- the violations don't occur on a view with cascaded check option:
CREATE VIEW v_ninth AS SELECT * FROM v_seventh WHERE id > 4 WITH CASCADED CHECK OPTION;
INSERT INTO v_ninth (id) values (2);
INSERT INTO v_ninth (name, usr) VALUES ('archaeopterix', fake_curr_usr());
INSERT INTO v_ninth (name, usr) VALUES ('pteranodon', 'anyone');
UPDATE v_ninth SET id = 2 WHERE name = 'jut';
UPDATE v_ninth SET usr = 'someone' WHERE name = 'jut';

DELETE FROM v_ninth WHERE usr = 'nobody';
DELETE FROM v_eighth WHERE usr = 'nobody';
DELETE FROM upd_first WHERE usr = 'anyone';

CREATE VIEW v_tenth AS SELECT * FROM upd_first WHERE id % 2 = 0 AND usr <> fake_curr_usr();
UPDATE v_tenth set name = 'its', usr = fake_curr_usr() WHERE usr = 'someone';
UPDATE v_tenth set name = 'toe' WHERE usr = 'anyone';

SELECT * FROM v_tenth ORDER BY id;
SELECT * FROM upd_first ORDER BY id;

CREATE VIEW v_tenth AS SELECT * FROM generate_series(1, 5);


Home | Main Index | Thread Index

Privacy Policy | PostgreSQL Archives hosted by Command Prompt, Inc. | Designed by tinysofa
Copyright © 1996 – 2008 PostgreSQL Global Development Group