[PATCH 07/16] Log enough data into the wal to reconstruct logical changes from it if wal_level=logical

From: Andres Freund <andres(at)2ndquadrant(dot)com>
To: pgsql-hackers(at)postgresql(dot)org
Subject: [PATCH 07/16] Log enough data into the wal to reconstruct logical changes from it if wal_level=logical
Date: 2012-06-13 11:28:38
Message-ID: 1339586927-13156-7-git-send-email-andres@2ndquadrant.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

From: Andres Freund <andres(at)anarazel(dot)de>

This adds a new wal_level value 'logical'

Missing cases:
- heap_multi_insert
- primary key changes for updates
- no primary key
- LOG_NEWPAGE
---
src/backend/access/heap/heapam.c | 135 ++++++++++++++++++++++++++++---
src/backend/access/transam/xlog.c | 1 +
src/backend/catalog/index.c | 74 +++++++++++++++++
src/bin/pg_controldata/pg_controldata.c | 2 +
src/include/access/xlog.h | 3 +-
src/include/catalog/index.h | 4 +
6 files changed, 207 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 9519e73..9149d53 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -52,6 +52,7 @@
#include "access/xact.h"
#include "access/xlogutils.h"
#include "catalog/catalog.h"
+#include "catalog/index.h"
#include "catalog/namespace.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -1937,10 +1938,19 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
xl_heap_insert xlrec;
xl_heap_header xlhdr;
XLogRecPtr recptr;
- XLogRecData rdata[3];
+ XLogRecData rdata[4];
Page page = BufferGetPage(buffer);
uint8 info = XLOG_HEAP_INSERT;

+ /*
+ * For the logical replication case we need the tuple even if were
+ * doing a full page write. We could alternatively store a pointer into
+ * the fpw though.
+ * For that to work we add another rdata entry for the buffer in that
+ * case.
+ */
+ bool need_tuple = wal_level == WAL_LEVEL_LOGICAL;
+
xlrec.all_visible_cleared = all_visible_cleared;
xlrec.target.node = relation->rd_node;
xlrec.target.tid = heaptup->t_self;
@@ -1960,18 +1970,32 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
*/
rdata[1].data = (char *) &xlhdr;
rdata[1].len = SizeOfHeapHeader;
- rdata[1].buffer = buffer;
+ rdata[1].buffer = need_tuple ? InvalidBuffer : buffer;
rdata[1].buffer_std = true;
rdata[1].next = &(rdata[2]);

/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits);
rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
- rdata[2].buffer = buffer;
+ rdata[2].buffer = need_tuple ? InvalidBuffer : buffer;
rdata[2].buffer_std = true;
rdata[2].next = NULL;

/*
+ * add record for the buffer without actual content thats removed if
+ * fpw is done for that buffer
+ */
+ if(need_tuple){
+ rdata[2].next = &(rdata[3]);
+
+ rdata[3].data = NULL;
+ rdata[3].len = 0;
+ rdata[3].buffer = buffer;
+ rdata[3].buffer_std = true;
+ rdata[3].next = NULL;
+ }
+
+ /*
* If this is the single and first tuple on page, we can reinit the
* page instead of restoring the whole thing. Set flag, and hide
* buffer references from XLogInsert.
@@ -1980,7 +2004,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
{
info |= XLOG_HEAP_INIT_PAGE;
- rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
+ rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
}

recptr = XLogInsert(RM_HEAP_ID, info, rdata);
@@ -2568,7 +2592,9 @@ l1:
{
xl_heap_delete xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
+ XLogRecData rdata[4];
+
+ bool need_tuple = wal_level == WAL_LEVEL_LOGICAL && relation->rd_id >= FirstNormalObjectId;

xlrec.all_visible_cleared = all_visible_cleared;
xlrec.target.node = relation->rd_node;
@@ -2584,6 +2610,73 @@ l1:
rdata[1].buffer_std = true;
rdata[1].next = NULL;

+ /*
+ * XXX: We could decide not to log changes when the origin is not the
+ * local node, that should reduce redundant logging.
+ */
+ if(need_tuple){
+ xl_heap_header xlhdr;
+
+ Oid indexoid = InvalidOid;
+ int16 pknratts;
+ int16 pkattnum[INDEX_MAX_KEYS];
+ Oid pktypoid[INDEX_MAX_KEYS];
+ Oid pkopclass[INDEX_MAX_KEYS];
+ TupleDesc desc = RelationGetDescr(relation);
+ Relation index_rel;
+ TupleDesc indexdesc;
+ int natt;
+
+ Datum idxvals[INDEX_MAX_KEYS];
+ bool idxisnull[INDEX_MAX_KEYS];
+ HeapTuple idxtuple;
+
+ MemSet(pkattnum, 0, sizeof(pkattnum));
+ MemSet(pktypoid, 0, sizeof(pktypoid));
+ MemSet(pkopclass, 0, sizeof(pkopclass));
+ MemSet(idxvals, 0, sizeof(idxvals));
+ MemSet(idxisnull, 0, sizeof(idxisnull));
+ relationFindPrimaryKey(relation, &indexoid, &pknratts, pkattnum, pktypoid, pkopclass);
+
+ if(!indexoid){
+ elog(WARNING, "Could not find primary key for table with oid %u",
+ relation->rd_id);
+ goto no_index_found;
+ }
+
+ index_rel = index_open(indexoid, AccessShareLock);
+
+ indexdesc = RelationGetDescr(index_rel);
+
+ for(natt = 0; natt < indexdesc->natts; natt++){
+ idxvals[natt] =
+ fastgetattr(&tp, pkattnum[natt], desc, &idxisnull[natt]);
+ Assert(!idxisnull[natt]);
+ }
+
+ idxtuple = heap_form_tuple(indexdesc, idxvals, idxisnull);
+
+ xlhdr.t_infomask2 = idxtuple->t_data->t_infomask2;
+ xlhdr.t_infomask = idxtuple->t_data->t_infomask;
+ xlhdr.t_hoff = idxtuple->t_data->t_hoff;
+
+ rdata[1].next = &(rdata[2]);
+ rdata[2].data = (char*)&xlhdr;
+ rdata[2].len = SizeOfHeapHeader;
+ rdata[2].buffer = InvalidBuffer;
+ rdata[2].next = NULL;
+
+ rdata[2].next = &(rdata[3]);
+ rdata[3].data = (char *) idxtuple->t_data + offsetof(HeapTupleHeaderData, t_bits);
+ rdata[3].len = idxtuple->t_len - offsetof(HeapTupleHeaderData, t_bits);
+ rdata[3].buffer = InvalidBuffer;
+ rdata[3].next = NULL;
+
+ heap_close(index_rel, NoLock);
+ no_index_found:
+ ;
+ }
+
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);

PageSetLSN(page, recptr);
@@ -4413,9 +4506,14 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
xl_heap_header xlhdr;
uint8 info;
XLogRecPtr recptr;
- XLogRecData rdata[4];
+ XLogRecData rdata[5];
Page page = BufferGetPage(newbuf);

+ /*
+ * Just as for XLOG_HEAP_INSERT we need to make sure the tuple
+ */
+ bool need_tuple = wal_level == WAL_LEVEL_LOGICAL;
+
/* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));

@@ -4446,28 +4544,43 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
xlhdr.t_hoff = newtup->t_data->t_hoff;

/*
- * As with insert records, we need not store the rdata[2] segment if we
- * decide to store the whole buffer instead.
+ * As with insert's logging , we need not store the the Datum containing
+ * tuples separately from the buffer if we do logical replication that
+ * is...
*/
rdata[2].data = (char *) &xlhdr;
rdata[2].len = SizeOfHeapHeader;
- rdata[2].buffer = newbuf;
+ rdata[2].buffer = need_tuple ? InvalidBuffer : newbuf;
rdata[2].buffer_std = true;
rdata[2].next = &(rdata[3]);

/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
- rdata[3].buffer = newbuf;
+ rdata[3].buffer = need_tuple ? InvalidBuffer : newbuf;
rdata[3].buffer_std = true;
rdata[3].next = NULL;

+ /*
+ * separate storage for the buffer reference of the new page in the
+ * wal_level=logical case
+ */
+ if(need_tuple){
+ rdata[3].next = &(rdata[4]);
+
+ rdata[4].data = NULL,
+ rdata[4].len = 0;
+ rdata[4].buffer = newbuf;
+ rdata[4].buffer_std = true;
+ rdata[4].next = NULL;
+ }
+
/* If new tuple is the single and first tuple on page... */
if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
{
info |= XLOG_HEAP_INIT_PAGE;
- rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
+ rdata[2].buffer = rdata[3].buffer = rdata[4].buffer = InvalidBuffer;
}

recptr = XLogInsert(RM_HEAP_ID, info, rdata);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 166efb0..c6feed0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -105,6 +105,7 @@ const struct config_enum_entry wal_level_options[] = {
{"minimal", WAL_LEVEL_MINIMAL, false},
{"archive", WAL_LEVEL_ARCHIVE, false},
{"hot_standby", WAL_LEVEL_HOT_STANDBY, false},
+ {"logical", WAL_LEVEL_LOGICAL, false},
{NULL, 0, false}
};

diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 9e8b1cc..4cddcac 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -49,6 +49,7 @@
#include "nodes/nodeFuncs.h"
#include "optimizer/clauses.h"
#include "parser/parser.h"
+#include "parser/parse_relation.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
@@ -3311,3 +3312,76 @@ ResetReindexPending(void)
{
pendingReindexedIndexes = NIL;
}
+
+/*
+ * relationFindPrimaryKey
+ * Find primary key for a relation if it exists.
+ *
+ * If no primary key is found *indexOid is set to InvalidOid
+ *
+ * This is quite similar to tablecmd.c's transformFkeyGetPrimaryKey.
+ *
+ * XXX: It might be a good idea to change pg_class.relhaspkey into an bool to
+ * make this more efficient.
+ */
+void
+relationFindPrimaryKey(Relation pkrel, Oid *indexOid,
+ int16 *nratts, int16 *attnums, Oid *atttypids,
+ Oid *opclasses){
+ List *indexoidlist;
+ ListCell *indexoidscan;
+ HeapTuple indexTuple = NULL;
+ Datum indclassDatum;
+ bool isnull;
+ oidvector *indclass;
+ int i;
+ Form_pg_index indexStruct = NULL;
+
+ *indexOid = InvalidOid;
+
+ indexoidlist = RelationGetIndexList(pkrel);
+
+ foreach(indexoidscan, indexoidlist)
+ {
+ Oid indexoid = lfirst_oid(indexoidscan);
+
+ indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
+ if(!HeapTupleIsValid(indexTuple))
+ elog(ERROR, "cache lookup failed for index %u", indexoid);
+
+ indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
+ if(indexStruct->indisprimary && indexStruct->indimmediate)
+ {
+ *indexOid = indexoid;
+ break;
+ }
+ ReleaseSysCache(indexTuple);
+
+ }
+ list_free(indexoidlist);
+
+ if (!OidIsValid(*indexOid))
+ return;
+
+ /* Must get indclass the hard way */
+ indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
+ Anum_pg_index_indclass, &isnull);
+ Assert(!isnull);
+ indclass = (oidvector *) DatumGetPointer(indclassDatum);
+
+ *nratts = indexStruct->indnatts;
+ /*
+ * Now build the list of PK attributes from the indkey definition (we
+ * assume a primary key cannot have expressional elements)
+ */
+ for (i = 0; i < indexStruct->indnatts; i++)
+ {
+ int pkattno = indexStruct->indkey.values[i];
+
+ attnums[i] = pkattno;
+ atttypids[i] = attnumTypeId(pkrel, pkattno);
+ opclasses[i] = indclass->values[i];
+ }
+
+ ReleaseSysCache(indexTuple);
+}
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index c00183a..47715c9 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -82,6 +82,8 @@ wal_level_str(WalLevel wal_level)
return "archive";
case WAL_LEVEL_HOT_STANDBY:
return "hot_standby";
+ case WAL_LEVEL_LOGICAL:
+ return "logical";
}
return _("unrecognized wal_level");
}
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index df5f232..2843aca 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -199,7 +199,8 @@ typedef enum WalLevel
{
WAL_LEVEL_MINIMAL = 0,
WAL_LEVEL_ARCHIVE,
- WAL_LEVEL_HOT_STANDBY
+ WAL_LEVEL_HOT_STANDBY,
+ WAL_LEVEL_LOGICAL
} WalLevel;
extern int wal_level;

diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 7c8198f..2ba0ac3 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -101,4 +101,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid);
extern bool ReindexIsProcessingIndex(Oid indexOid);
extern Oid IndexGetRelation(Oid indexId, bool missing_ok);

+extern void relationFindPrimaryKey(Relation pkrel, Oid *indexOid,
+ int16 *nratts, int16 *attnums, Oid *atttypids,
+ Oid *opclasses);
+
#endif /* INDEX_H */
--
1.7.10.rc3.3.g19a6c.dirty

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2012-06-13 11:28:39 [PATCH 08/16] Introduce the ApplyCache module which can reassemble transactions from a stream of interspersed changes
Previous Message Andres Freund 2012-06-13 11:28:37 [PATCH 06/16] Add support for a generic wal reading facility dubbed XLogReader