[PATCH 10/16] Introduce the concept that wal has a 'origin' node

From: Andres Freund <andres(at)2ndquadrant(dot)com>
To: pgsql-hackers(at)postgresql(dot)org
Subject: [PATCH 10/16] Introduce the concept that wal has a 'origin' node
Date: 2012-06-13 11:28:41
Message-ID: 1339586927-13156-10-git-send-email-andres@2ndquadrant.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

From: Andres Freund <andres(at)anarazel(dot)de>

One solution to avoid loops when doing wal based logical replication in
topologies which are more complex than one unidirectional transport is
introducing the concept of a 'origin_id' into the wal stream. Luckily there is
some padding in the XLogRecord struct that allows us to add that field without
further bloating the struct.
This solution was chosen because it allows for just about any topology and is
inobstrusive.

This adds a new configuration parameter multimaster_node_id which determines
the id used for wal originating in one cluster.

When applying changes from wal from another cluster code can set the variable
current_replication_origin_id. This is a global variable because passing it
through everything which can generate wal would be far to intrusive.
---
src/backend/access/transam/xact.c | 48 +++++++++++++++++++------
src/backend/access/transam/xlog.c | 3 +-
src/backend/access/transam/xlogreader.c | 2 ++
src/backend/replication/logical/Makefile | 2 +-
src/backend/replication/logical/logical.c | 19 ++++++++++
src/backend/utils/misc/guc.c | 19 ++++++++++
src/backend/utils/misc/postgresql.conf.sample | 3 ++
src/include/access/xlog.h | 4 +--
src/include/access/xlogdefs.h | 2 ++
src/include/replication/logical.h | 22 ++++++++++++
10 files changed, 110 insertions(+), 14 deletions(-)
create mode 100644 src/backend/replication/logical/logical.c
create mode 100644 src/include/replication/logical.h

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 3cc2bfa..dc30a17 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -36,8 +36,9 @@
#include "libpq/be-fsstubs.h"
#include "miscadmin.h"
#include "pgstat.h"
-#include "replication/walsender.h"
+#include "replication/logical.h"
#include "replication/syncrep.h"
+#include "replication/walsender.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
@@ -4545,12 +4546,13 @@ xactGetCommittedChildren(TransactionId **ptr)
* actions for which the order of execution is critical.
*/
static void
-xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
- TransactionId *sub_xids, int nsubxacts,
- SharedInvalidationMessage *inval_msgs, int nmsgs,
- RelFileNode *xnodes, int nrels,
- Oid dbId, Oid tsId,
- uint32 xinfo)
+xact_redo_commit_internal(TransactionId xid, RepNodeId originating_node,
+ XLogRecPtr lsn, XLogRecPtr origin_lsn,
+ TransactionId *sub_xids, int nsubxacts,
+ SharedInvalidationMessage *inval_msgs, int nmsgs,
+ RelFileNode *xnodes, int nrels,
+ Oid dbId, Oid tsId,
+ uint32 xinfo)
{
TransactionId max_xid;
int i;
@@ -4659,8 +4661,13 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
* Utility function to call xact_redo_commit_internal after breaking down xlrec
*/
static void
+<<<<<<< HEAD
xact_redo_commit(xl_xact_commit *xlrec,
TransactionId xid, XLogRecPtr lsn)
+=======
+xact_redo_commit(xl_xact_commit *xlrec, RepNodeId originating_node,
+ TransactionId xid, XLogRecPtr lsn)
+>>>>>>> Introduce the concept that wal has a 'origin' node
{
TransactionId *subxacts;
SharedInvalidationMessage *inval_msgs;
@@ -4670,18 +4677,26 @@ xact_redo_commit(xl_xact_commit *xlrec,
/* invalidation messages array follows subxids */
inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);

+<<<<<<< HEAD
xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts,
inval_msgs, xlrec->nmsgs,
xlrec->xnodes, xlrec->nrels,
xlrec->dbId,
xlrec->tsId,
xlrec->xinfo);
+=======
+ xact_redo_commit_internal(xid, originating_node, lsn, xlrec->origin_lsn,
+ subxacts, xlrec->nsubxacts, inval_msgs,
+ xlrec->nmsgs, xlrec->xnodes, xlrec->nrels,
+ xlrec->dbId, xlrec->tsId, xlrec->xinfo);
+>>>>>>> Introduce the concept that wal has a 'origin' node
}

/*
* Utility function to call xact_redo_commit_internal for compact form of message.
*/
static void
+<<<<<<< HEAD
xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
TransactionId xid, XLogRecPtr lsn)
{
@@ -4691,6 +4706,18 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
InvalidOid, /* dbId */
InvalidOid, /* tsId */
0); /* xinfo */
+=======
+xact_redo_commit_compact(xl_xact_commit_compact *xlrec, RepNodeId originating_node,
+ TransactionId xid, XLogRecPtr lsn)
+{
+ xact_redo_commit_internal(xid, originating_node, lsn, zeroRecPtr, xlrec->subxacts,
+ xlrec->nsubxacts,
+ NULL, 0, /* inval msgs */
+ NULL, 0, /* relfilenodes */
+ InvalidOid, /* dbId */
+ InvalidOid, /* tsId */
+ 0); /* xinfo */
+>>>>>>> Introduce the concept that wal has a 'origin' node
}

/*
@@ -4786,17 +4813,18 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
/* Backup blocks are not used in xact records */
Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));

+ /* FIXME: we probably shouldn't pass xl_origin_id at multiple places, hm */
if (info == XLOG_XACT_COMMIT_COMPACT)
{
xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);

- xact_redo_commit_compact(xlrec, record->xl_xid, lsn);
+ xact_redo_commit_compact(xlrec, record->xl_origin_id, record->xl_xid, lsn);
}
else if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);

- xact_redo_commit(xlrec, record->xl_xid, lsn);
+ xact_redo_commit(xlrec, record->xl_origin_id, record->xl_xid, lsn);
}
else if (info == XLOG_XACT_ABORT)
{
@@ -4814,7 +4842,7 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
{
xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);

- xact_redo_commit(&xlrec->crec, xlrec->xid, lsn);
+ xact_redo_commit(&xlrec->crec, record->xl_origin_id, xlrec->xid, lsn);
RemoveTwoPhaseFile(xlrec->xid, false);
}
else if (info == XLOG_XACT_ABORT_PREPARED)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c6feed0..504b4d0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@
#include "postmaster/startup.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/logical.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -1032,7 +1033,7 @@ begin:;
record->xl_len = len; /* doesn't include backup blocks */
record->xl_info = info;
record->xl_rmid = rmid;
-
+ record->xl_origin_id = current_replication_origin_id;
/* Now we can finish computing the record's CRC */
COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
SizeOfXLogRecord - sizeof(pg_crc32));
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 6f15d66..bacd31e 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -24,6 +24,7 @@
#include "access/xlogreader.h"

/* FIXME */
+#include "replication/logical.h" /* InvalidMultimasterNodeId */
#include "replication/walsender_private.h"
#include "replication/walprotocol.h"

@@ -563,6 +564,7 @@ XLogReaderRead(XLogReaderState* state)
spacer.xl_len = temp_record->xl_tot_len - SizeOfXLogRecord;
spacer.xl_rmid = RM_XLOG_ID;
spacer.xl_info = XLOG_NOOP;
+ spacer.xl_origin_id = InvalidMultimasterNodeId;

state->writeout_data(state,
(char*)&spacer,
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 7dd9663..c2d6d82 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.global

override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)

-OBJS = applycache.o decode.o
+OBJS = applycache.o decode.o logical.o

include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
new file mode 100644
index 0000000..4f34488
--- /dev/null
+++ b/src/backend/replication/logical/logical.c
@@ -0,0 +1,19 @@
+/*-------------------------------------------------------------------------
+ *
+ * logical.c
+ *
+ * Support functions for logical/multimaster replication
+ *
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical.c
+ *
+ */
+#include "postgres.h"
+#include "replication/logical.h"
+int guc_replication_origin_id = InvalidMultimasterNodeId;
+RepNodeId current_replication_origin_id = InvalidMultimasterNodeId;
+XLogRecPtr current_replication_origin_lsn = {0, 0};
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 93c798b..46b0657 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -60,6 +60,7 @@
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/logical.h"
#include "storage/bufmgr.h"
#include "storage/standby.h"
#include "storage/fd.h"
@@ -198,6 +199,7 @@ static const char *show_tcp_keepalives_interval(void);
static const char *show_tcp_keepalives_count(void);
static bool check_maxconnections(int *newval, void **extra, GucSource source);
static void assign_maxconnections(int newval, void *extra);
+static void assign_replication_node_id(int newval, void *extra);
static bool check_maxworkers(int *newval, void **extra, GucSource source);
static void assign_maxworkers(int newval, void *extra);
static bool check_autovacuum_max_workers(int *newval, void **extra, GucSource source);
@@ -1598,6 +1600,16 @@ static struct config_int ConfigureNamesInt[] =
},

{
+ {"multimaster_node_id", PGC_POSTMASTER, REPLICATION_MASTER,
+ gettext_noop("node id for multimaster."),
+ NULL
+ },
+ &guc_replication_origin_id,
+ InvalidMultimasterNodeId, InvalidMultimasterNodeId, MaxMultimasterNodeId,
+ NULL, assign_replication_node_id, NULL
+ },
+
+ {
{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the maximum number of concurrent connections."),
NULL
@@ -8629,6 +8641,13 @@ assign_maxconnections(int newval, void *extra)
MaxBackends = newval + MaxWorkers + autovacuum_max_workers + 1;
}

+static void
+assign_replication_node_id(int newval, void *extra)
+{
+ guc_replication_origin_id = newval;
+ current_replication_origin_id = newval;
+}
+
static bool
check_maxworkers(int *newval, void **extra, GucSource source)
{
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ce3fc08..12f8a3f 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -241,6 +241,9 @@
#hot_standby_feedback = off # send info from standby to prevent
# query conflicts

+# - Multi Master Servers -
+
+#multimaster_node_id = 0 #invalid node id

#------------------------------------------------------------------------------
# QUERY TUNING
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 2843aca..dd89cff 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -47,8 +47,8 @@ typedef struct XLogRecord
uint32 xl_len; /* total len of rmgr data */
uint8 xl_info; /* flag bits, see below */
RmgrId xl_rmid; /* resource manager for this record */
-
- /* Depending on MAXALIGN, there are either 2 or 6 wasted bytes here */
+ RepNodeId xl_origin_id; /* what node did originally cause this record to be written */
+ /* Depending on MAXALIGN, there are either 0 or 4 wasted bytes here */

/* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */

diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index 2768427..6b6700a 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -84,6 +84,8 @@ extern XLogRecPtr zeroRecPtr;
*/
typedef uint32 TimeLineID;

+typedef uint16 RepNodeId;
+
/*
* Because O_DIRECT bypasses the kernel buffers, and because we never
* read those buffers except during crash recovery or if wal_level != minimal,
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
new file mode 100644
index 0000000..0698b61
--- /dev/null
+++ b/src/include/replication/logical.h
@@ -0,0 +1,22 @@
+/*
+ * logical.h
+ *
+ * PostgreSQL logical replication support
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/replication/logical.h
+ */
+#ifndef LOGICAL_H
+#define LOGICAL_H
+
+#include "access/xlogdefs.h"
+
+extern int guc_replication_origin_id;
+extern RepNodeId current_replication_origin_id;
+extern XLogRecPtr current_replication_origin_lsn;
+
+#define InvalidMultimasterNodeId 0
+#define MaxMultimasterNodeId (2<<3)
+#endif
--
1.7.10.rc3.3.g19a6c.dirty

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2012-06-13 11:28:42 [PATCH 11/16] Add infrastructure for manipulating multiple streams of wal on a segment handling level
Previous Message Andres Freund 2012-06-13 11:28:40 [PATCH 09/16] Decode wal (with wal_level=logical) into changes in an ApplyCache instance