[PATCH 09/16] Decode wal (with wal_level=logical) into changes in an ApplyCache instance

From: Andres Freund <andres(at)2ndquadrant(dot)com>
To: pgsql-hackers(at)postgresql(dot)org
Subject: [PATCH 09/16] Decode wal (with wal_level=logical) into changes in an ApplyCache instance
Date: 2012-06-13 11:28:40
Message-ID: 1339586927-13156-9-git-send-email-andres@2ndquadrant.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

From: Andres Freund <andres(at)anarazel(dot)de>

This requires an up2date catalog and can thus only be run on a replica.

Missing:
- HEAP_NEWPAGE support
- HEAP2_MULTI_INSERT support
- DDL integration. *No* ddl, including TRUNCATE is possible atm
---
src/backend/replication/logical/Makefile | 2 +-
src/backend/replication/logical/decode.c | 439 ++++++++++++++++++++++++++++++
src/include/replication/decode.h | 23 ++
3 files changed, 463 insertions(+), 1 deletion(-)
create mode 100644 src/backend/replication/logical/decode.c
create mode 100644 src/include/replication/decode.h

diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 2eadab8..7dd9663 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.global

override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)

-OBJS = applycache.o
+OBJS = applycache.o decode.o

include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
new file mode 100644
index 0000000..7e07d50
--- /dev/null
+++ b/src/backend/replication/logical/decode.c
@@ -0,0 +1,439 @@
+/*-------------------------------------------------------------------------
+ *
+ * decode.c
+ *
+ * Decodes wal records from an xlogreader.h callback into an applycache
+ *
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/decode.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/transam.h"
+#include "access/xlog_internal.h"
+#include "access/xact.h"
+
+#include "replication/applycache.h"
+#include "replication/decode.h"
+
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+#include "utils/lsyscache.h"
+
+static void DecodeXLogTuple(char* data, Size len,
+ HeapTuple table, ApplyCacheTupleBuf* tuple);
+
+static void DecodeInsert(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeUpdate(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeDelete(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeNewpage(ApplyCache *cache, XLogRecordBuffer* buf);
+static void DecodeMultiInsert(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeCommit(ApplyCache* cache, XLogRecordBuffer* buf, TransactionId xid,
+ TransactionId *sub_xids, int nsubxacts);
+
+
+void DecodeRecordIntoApplyCache(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+ XLogRecord* r = &buf->record;
+ uint8 info = r->xl_info & ~XLR_INFO_MASK;
+
+ switch (r->xl_rmid)
+ {
+ case RM_HEAP_ID:
+ {
+ info &= XLOG_HEAP_OPMASK;
+ switch (info)
+ {
+ case XLOG_HEAP_INSERT:
+ DecodeInsert(cache, buf);
+ break;
+
+ /* no guarantee that we get an HOT update again, so handle it as a normal update*/
+ case XLOG_HEAP_HOT_UPDATE:
+ case XLOG_HEAP_UPDATE:
+ DecodeUpdate(cache, buf);
+ break;
+
+ case XLOG_HEAP_NEWPAGE:
+ DecodeNewpage(cache, buf);
+ break;
+
+ case XLOG_HEAP_DELETE:
+ DecodeDelete(cache, buf);
+ break;
+ default:
+ break;
+ }
+ break;
+ }
+ case RM_HEAP2_ID:
+ {
+ info &= XLOG_HEAP_OPMASK;
+ switch (info)
+ {
+ case XLOG_HEAP2_MULTI_INSERT:
+ /* this also handles the XLOG_HEAP_INIT_PAGE case */
+ DecodeMultiInsert(cache, buf);
+ break;
+ default:
+ /* everything else here is just physical stuff were not interested in */
+ break;
+ }
+ break;
+ }
+
+ case RM_XACT_ID:
+ {
+ switch (info)
+ {
+ case XLOG_XACT_COMMIT:
+ {
+ TransactionId *sub_xids;
+ xl_xact_commit *xlrec = (xl_xact_commit*)buf->record_data;
+
+ /* FIXME: this is not really allowed if there is no subtransactions */
+ sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+ DecodeCommit(cache, buf, r->xl_xid, sub_xids, xlrec->nsubxacts);
+
+ break;
+ }
+ case XLOG_XACT_COMMIT_PREPARED:
+ {
+ TransactionId *sub_xids;
+ xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared*)buf->record_data;
+
+ sub_xids = (TransactionId *) &(xlrec->crec.xnodes[xlrec->crec.nrels]);
+
+ DecodeCommit(cache, buf, r->xl_xid, sub_xids,
+ xlrec->crec.nsubxacts);
+
+ break;
+ }
+ case XLOG_XACT_COMMIT_COMPACT:
+ {
+ xl_xact_commit_compact *xlrec = (xl_xact_commit_compact*)buf->record_data;
+ DecodeCommit(cache, buf, r->xl_xid, xlrec->subxacts,
+ xlrec->nsubxacts);
+ break;
+ }
+ case XLOG_XACT_ABORT:
+ case XLOG_XACT_ABORT_PREPARED:
+ {
+ TransactionId *sub_xids;
+ xl_xact_abort *xlrec = (xl_xact_abort*)buf->record_data;
+ int i;
+
+ /* FIXME: this is not really allowed if there is no subtransactions */
+ sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+
+ for(i = 0; i < xlrec->nsubxacts; i++)
+ {
+ ApplyCacheAbort(cache, *sub_xids, buf->origptr);
+ sub_xids += 1;
+ }
+
+ /* TODO: check that this also contains not-yet-aborted subtxns */
+ ApplyCacheAbort(cache, r->xl_xid, buf->origptr);
+
+ elog(WARNING, "ABORT %u", r->xl_xid);
+ break;
+ }
+ case XLOG_XACT_ASSIGNMENT:
+ /*
+ * XXX: We could reassign transactions to the parent here
+ * to save space and effort when merging transactions at
+ * commit.
+ */
+ break;
+ case XLOG_XACT_PREPARE:
+ /*
+ * FXIME: we should replay the transaction and prepare it
+ * as well.
+ */
+ break;
+ default:
+ break;
+ ;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+}
+
+static void
+DecodeCommit(ApplyCache* cache, XLogRecordBuffer* buf, TransactionId xid,
+ TransactionId *sub_xids, int nsubxacts)
+{
+ int i;
+
+ for (i = 0; i < nsubxacts; i++)
+ {
+ ApplyCacheCommitChild(cache, xid, *sub_xids, buf->origptr);
+ sub_xids++;
+ }
+
+ /* replay actions of all transaction + subtransactions in order */
+ ApplyCacheCommit(cache, xid, buf->origptr);
+}
+
+static void DecodeInsert(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+ XLogRecord* r = &buf->record;
+ xl_heap_insert *xlrec = (xl_heap_insert *) buf->record_data;
+
+ Oid relfilenode = xlrec->target.node.relNode;
+
+ ApplyCacheChange* change;
+
+ if (r->xl_info & XLR_BKP_BLOCK_1
+ && r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader))
+ {
+ elog(FATAL, "huh, no tuple data on wal_level = logical?");
+ }
+
+ if(relfilenode == 0)
+ {
+ elog(ERROR, "nailed catalog changed");
+ }
+
+ change = ApplyCacheGetChange(cache);
+ change->action = APPLY_CACHE_CHANGE_INSERT;
+
+ /*
+ * Lookup the pg_class entry for the relfilenode to get the real oid
+ */
+ {
+ MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext);
+ change->table = SearchSysCacheCopy1(RELFILENODE,
+ relfilenode);
+ MemoryContextSwitchTo(curctx);
+ }
+
+ if (!HeapTupleIsValid(change->table))
+ {
+#ifdef SHOULD_BE_HANDLED_BETTER
+ elog(WARNING, "cache lookup failed for relfilenode %u, systable?",
+ relfilenode);
+#endif
+ ApplyCacheReturnChange(cache, change);
+ return;
+ }
+
+ if (HeapTupleGetOid(change->table) < FirstNormalObjectId)
+ {
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "skipping change to systable");
+#endif
+ ApplyCacheReturnChange(cache, change);
+ return;
+ }
+
+#ifdef VERBOSE_DEBUG
+ {
+ /*for accessing the cache */
+ Form_pg_class class_form;
+ class_form = (Form_pg_class) GETSTRUCT(change->table);
+ elog(WARNING, "INSERT INTO \"%s\"", NameStr(class_form->relname));
+ }
+#endif
+
+ change->newtuple = ApplyCacheGetTupleBuf(cache);
+
+ DecodeXLogTuple((char*)xlrec + SizeOfHeapInsert,
+ r->xl_len - SizeOfHeapInsert,
+ change->table, change->newtuple);
+
+ ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+static void
+DecodeUpdate(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+ XLogRecord* r = &buf->record;
+ xl_heap_update *xlrec = (xl_heap_update *) buf->record_data;
+
+ Oid relfilenode = xlrec->target.node.relNode;
+
+ ApplyCacheChange* change;
+
+ if ((r->xl_info & XLR_BKP_BLOCK_1 || r->xl_info & XLR_BKP_BLOCK_2) &&
+ (r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader)))
+ {
+ elog(FATAL, "huh, no tuple data on wal_level = logical?");
+ }
+
+ change = ApplyCacheGetChange(cache);
+ change->action = APPLY_CACHE_CHANGE_UPDATE;
+
+ /*
+ * Lookup the pg_class entry for the relfilenode to get the real oid
+ */
+ {
+ MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext);
+ change->table = SearchSysCacheCopy1(RELFILENODE,
+ relfilenode);
+ MemoryContextSwitchTo(curctx);
+ }
+
+ if (!HeapTupleIsValid(change->table))
+ {
+#ifdef SHOULD_BE_HANDLED_BETTER
+ elog(WARNING, "cache lookup failed for relfilenode %u, systable?",
+ relfilenode);
+#endif
+ ApplyCacheReturnChange(cache, change);
+ return;
+ }
+
+ if (HeapTupleGetOid(change->table) < FirstNormalObjectId)
+ {
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "skipping change to systable");
+#endif
+ ApplyCacheReturnChange(cache, change);
+ return;
+ }
+
+#ifdef VERBOSE_DEBUG
+ {
+ /*for accessing the cache */
+ Form_pg_class class_form;
+ class_form = (Form_pg_class) GETSTRUCT(change->table);
+ elog(WARNING, "UPDATE \"%s\"", NameStr(class_form->relname));
+ }
+#endif
+
+ /* FIXME: need to save the old tuple as well if we want primary key updates to work. */
+ change->newtuple = ApplyCacheGetTupleBuf(cache);
+
+ DecodeXLogTuple((char*)xlrec + SizeOfHeapUpdate,
+ r->xl_len - SizeOfHeapUpdate,
+ change->table, change->newtuple);
+
+ ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+static void DecodeDelete(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+ XLogRecord* r = &buf->record;
+
+ xl_heap_delete *xlrec = (xl_heap_delete *) buf->record_data;
+
+ Oid relfilenode = xlrec->target.node.relNode;
+
+ ApplyCacheChange* change;
+
+ change = ApplyCacheGetChange(cache);
+ change->action = APPLY_CACHE_CHANGE_DELETE;
+
+ if (r->xl_len <= (SizeOfHeapDelete + SizeOfHeapHeader))
+ {
+ elog(FATAL, "huh, no primary key for a delete on wal_level = logical?");
+ }
+
+ /*
+ * Lookup the pg_class entry for the relfilenode to get the real oid
+ */
+ {
+ MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext);
+ change->table = SearchSysCacheCopy1(RELFILENODE,
+ relfilenode);
+ MemoryContextSwitchTo(curctx);
+ }
+
+ if (!HeapTupleIsValid(change->table))
+ {
+#ifdef SHOULD_BE_HANDLED_BETTER
+ elog(WARNING, "cache lookup failed for relfilenode %u, systable?",
+ relfilenode);
+#endif
+ ApplyCacheReturnChange(cache, change);
+ return;
+ }
+
+ if (HeapTupleGetOid(change->table) < FirstNormalObjectId)
+ {
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "skipping change to systable");
+#endif
+ ApplyCacheReturnChange(cache, change);
+ return;
+ }
+
+#ifdef VERBOSE_DEBUG
+ {
+ /*for accessing the cache */
+ Form_pg_class class_form;
+ class_form = (Form_pg_class) GETSTRUCT(change->table);
+ elog(WARNING, "DELETE FROM \"%s\"", NameStr(class_form->relname));
+ }
+#endif
+
+ change->oldtuple = ApplyCacheGetTupleBuf(cache);
+
+ DecodeXLogTuple((char*)xlrec + SizeOfHeapDelete,
+ r->xl_len - SizeOfHeapDelete,
+ change->table, change->oldtuple);
+
+ ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+
+static void
+DecodeNewpage(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+ elog(WARNING, "skipping XLOG_HEAP_NEWPAGE record because we are too dumb");
+}
+
+static void
+DecodeMultiInsert(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+ elog(WARNING, "skipping XLOG_HEAP2_MULTI_INSERT record because we are too dumb");
+}
+
+
+static void DecodeXLogTuple(char* data, Size len,
+ HeapTuple table, ApplyCacheTupleBuf* tuple)
+{
+ xl_heap_header xlhdr;
+ int datalen = len - SizeOfHeapHeader;
+
+ Assert(datalen >= 0);
+ Assert(datalen <= MaxHeapTupleSize);
+
+ tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits);
+
+ /* not a disk based tuple */
+ ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+ /* probably not needed, but ... (is it actually valid to set it?) */
+ tuple->tuple.t_tableOid = HeapTupleGetOid(table);
+ tuple->tuple.t_data = &tuple->header;
+
+ /* data is not stored aligned */
+ memcpy((char *) &xlhdr,
+ data,
+ SizeOfHeapHeader);
+
+ memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
+
+ memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits),
+ data + SizeOfHeapHeader,
+ datalen);
+
+ tuple->header.t_infomask = xlhdr.t_infomask;
+ tuple->header.t_infomask2 = xlhdr.t_infomask2;
+ tuple->header.t_hoff = xlhdr.t_hoff;
+}
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
new file mode 100644
index 0000000..53088e2
--- /dev/null
+++ b/src/include/replication/decode.h
@@ -0,0 +1,23 @@
+/*-------------------------------------------------------------------------
+ * decode.h
+ * PostgreSQL WAL to logical transformation
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DECODE_H
+#define DECODE_H
+
+#include "access/xlogreader.h"
+#include "replication/applycache.h"
+
+void DecodeRecordIntoApplyCache(ApplyCache *cache, XLogRecordBuffer* buf);
+
+typedef struct ReaderApplyState
+{
+ ApplyCache *apply_cache;
+} ReaderApplyState;
+
+#endif
--
1.7.10.rc3.3.g19a6c.dirty

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2012-06-13 11:28:41 [PATCH 10/16] Introduce the concept that wal has a 'origin' node
Previous Message Andres Freund 2012-06-13 11:28:39 [PATCH 08/16] Introduce the ApplyCache module which can reassemble transactions from a stream of interspersed changes