[PATCH 12/16] Add state to keep track of logical replication

From: Andres Freund <andres(at)2ndquadrant(dot)com>
To: pgsql-hackers(at)postgresql(dot)org
Subject: [PATCH 12/16] Add state to keep track of logical replication
Date: 2012-06-13 11:28:43
Message-ID: 1339586927-13156-12-git-send-email-andres@2ndquadrant.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

From: Andres Freund <andres(at)anarazel(dot)de>

In order to have restartable replication with minimal additional writes its
very useful to know up to which point we have replayed/received changes from a
foreign node.

One representation of that is the lsn of changes at the originating cluster.

We need to keep track of the point up to which we received data and up to where
we applied data.

For that we added a field 'origin_lsn' to commit records. This allows to keep
track of the apply position with crash recovery with minimal additional io. We
only added the field to non-compact commit records to reduce the overhead in
case logical replication is not used.

Checkpoints need to keep track of the apply/receive positions as well because
otherwise it would be hard to determine the lsn from where to restart
receive/apply after a shutdown/crash if no changes happened since the last
shutdown/crash.

While running the startup process, the walreceiver and a (future) apply process
will need a coherent picture those two states so add shared memory state to
keep track of it. Currently this is represented in the walreceivers shared
memory segment. This will likely need to change.

During crash recovery/physical replication the origin_lsn field of commit
records is used to update the shared memory, and thus the next checkpoint's,
notion of the apply state.

Missing:

- For correct crash recovery we need more state than the 'apply lsn' because
transactions on the originating side can overlap. At the lsn we just applied
many other transaction can be in-progres. To correctly handle that we need to
keep track of oldest start lsn of any transaction currently being reassembled
(c.f. ApplyCache). Then we can start to reassemble the ApplyCache up from
that point and throw away any transaction which comitted before the
recorded/recovered apply lsn.
It should be sufficient to store that knowledge in shared memory and
checkpoint records.
---
src/backend/access/transam/xact.c | 22 ++++++++-
src/backend/access/transam/xlog.c | 73 ++++++++++++++++++++++++++++
src/backend/replication/walreceiverfuncs.c | 8 +++
src/include/access/xact.h | 1 +
src/include/catalog/pg_control.h | 13 ++++-
src/include/replication/walreceiver.h | 13 +++++
6 files changed, 128 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index dc30a17..40ac965 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -39,11 +39,13 @@
#include "replication/logical.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
+#include "replication/walreceiver.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/smgr.h"
+#include "storage/spin.h"
#include "utils/combocid.h"
#include "utils/guc.h"
#include "utils/inval.h"
@@ -1015,7 +1017,8 @@ RecordTransactionCommit(void)
/*
* Do we need the long commit record? If not, use the compact format.
*/
- if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
+ if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit ||
+ (wal_level == WAL_LEVEL_LOGICAL && current_replication_origin_id != guc_replication_origin_id))
{
XLogRecData rdata[4];
int lastrdata = 0;
@@ -1037,6 +1040,8 @@ RecordTransactionCommit(void)
xlrec.nrels = nrels;
xlrec.nsubxacts = nchildren;
xlrec.nmsgs = nmsgs;
+ xlrec.origin_lsn = current_replication_origin_lsn;
+
rdata[0].data = (char *) (&xlrec);
rdata[0].len = MinSizeOfXactCommit;
rdata[0].buffer = InvalidBuffer;
@@ -4575,6 +4580,21 @@ xact_redo_commit_internal(TransactionId xid, RepNodeId originating_node,
LWLockRelease(XidGenLock);
}

+ /*
+ * record where were at wrt to recovery. We need that to know from where on
+ * to restart applying logical change records
+ */
+ if(LogicalWalReceiverActive() && !XLByteEQ(origin_lsn, zeroRecPtr))
+ {
+ /*
+ * probably we don't need the locking because no lcr receiver can run
+ * yet.
+ */
+ SpinLockAcquire(&WalRcv->mutex);
+ WalRcv->mm_applyState[originating_node] = origin_lsn;
+ SpinLockRelease(&WalRcv->mutex);
+ }
+
if (standbyState == STANDBY_DISABLED)
{
/*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0622726..20a4611 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5183,6 +5183,7 @@ BootStrapXLOG(void)
uint64 sysidentifier;
struct timeval tv;
pg_crc32 crc;
+ int i;

/*
* Select a hopefully-unique system identifier code for this installation.
@@ -5229,6 +5230,13 @@ BootStrapXLOG(void)
checkPoint.time = (pg_time_t) time(NULL);
checkPoint.oldestActiveXid = InvalidTransactionId;

+ for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId;
+ i++){
+ checkPoint.logicalReceiveState[i] = zeroRecPtr;
+ checkPoint.logicalApplyState[i] = zeroRecPtr;
+ }
+
+
ShmemVariableCache->nextXid = checkPoint.nextXid;
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
@@ -6314,6 +6322,53 @@ StartupXLOG(void)
InRecovery = true;
}

+ /*
+ * setup shared memory state for logical wal receiver
+ *
+ * Do this unconditionally so enabling/disabling/enabling logical replay
+ * doesn't loose information due to rewriting pg_control
+ */
+ {
+ int i;
+
+ Assert(WalRcv);
+ /* locking is not really required here afaics, but ... */
+ SpinLockAcquire(&WalRcv->mutex);
+
+ for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1;
+ i++)
+ {
+ XLogRecPtr* receiveState = &ControlFile->checkPointCopy.logicalReceiveState[i];
+ XLogRecPtr* applyState = &ControlFile->checkPointCopy.logicalApplyState[i];
+ if(i == guc_replication_origin_id && (
+ !XLByteEQ(*receiveState, zeroRecPtr) ||
+ !XLByteEQ(*applyState, zeroRecPtr))
+ )
+ {
+ elog(WARNING, "logical recovery state for own db. apply: %X/%X, receive %X/%X, origin %d",
+ applyState->xlogid, applyState->xrecoff,
+ receiveState->xlogid, receiveState->xrecoff,
+ guc_replication_origin_id);
+ WalRcv->mm_receiveState[i] = zeroRecPtr;
+ WalRcv->mm_applyState[i] = zeroRecPtr;
+ }
+ else{
+ WalRcv->mm_receiveState[i] = *receiveState;
+ WalRcv->mm_applyState[i] = *applyState;
+ }
+ }
+ SpinLockRelease(&WalRcv->mutex);
+
+ /* FIXME: remove at some point */
+ for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1;
+ i++){
+ elog(LOG, "restored apply state for node %d to %X/%X, receive %X/%X",
+ i,
+ WalRcv->mm_applyState[i].xlogid, WalRcv->mm_applyState[i].xrecoff,
+ WalRcv->mm_receiveState[i].xlogid, WalRcv->mm_receiveState[i].xrecoff);
+ }
+ }
+
/* REDO */
if (InRecovery)
{
@@ -7906,6 +7961,24 @@ CreateCheckPoint(int flags)
&checkPoint.nextMultiOffset);

/*
+ * fill out where are at wrt logical replay. Do this unconditionally so we
+ * don't loose information due to rewriting pg_control when toggling
+ * logical replay
+ */
+ {
+ int i;
+ SpinLockAcquire(&WalRcv->mutex);
+
+ for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1;
+ i++){
+ checkPoint.logicalApplyState[i] = WalRcv->mm_applyState[i];
+ checkPoint.logicalReceiveState[i] = WalRcv->mm_receiveState[i];
+ }
+ SpinLockRelease(&WalRcv->mutex);
+ elog(LOG, "updated logical checkpoint data");
+ }
+
+ /*
* Having constructed the checkpoint record, ensure all shmem disk buffers
* and commit-log buffers are flushed to disk.
*
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 876196f..cb49282 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -64,6 +64,14 @@ WalRcvShmemInit(void)
MemSet(WalRcv, 0, WalRcvShmemSize());
WalRcv->walRcvState = WALRCV_STOPPED;
SpinLockInit(&WalRcv->mutex);
+
+ memset(&WalRcv->mm_receiveState,
+ 0, sizeof(WalRcv->mm_receiveState));
+ memset(&WalRcv->mm_applyState,
+ 0, sizeof(WalRcv->mm_applyState));
+
+ memset(&WalRcv->mm_receiveLatch,
+ 0, sizeof(WalRcv->mm_receiveLatch));
}
}

diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index b12d2a0..2757782 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -137,6 +137,7 @@ typedef struct xl_xact_commit
int nmsgs; /* number of shared inval msgs */
Oid dbId; /* MyDatabaseId */
Oid tsId; /* MyDatabaseTableSpace */
+ XLogRecPtr origin_lsn; /* location of originating commit */
/* Array of RelFileNode(s) to drop at commit */
RelFileNode xnodes[1]; /* VARIABLE LENGTH ARRAY */
/* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 5cff396..bc6316e 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -16,12 +16,13 @@
#define PG_CONTROL_H

#include "access/xlogdefs.h"
+#include "replication/logical.h"
#include "pgtime.h" /* for pg_time_t */
#include "utils/pg_crc.h"


/* Version identifier for this pg_control format */
-#define PG_CONTROL_VERSION 922
+#define PG_CONTROL_VERSION 923

/*
* Body of CheckPoint XLOG records. This is declared here because we keep
@@ -50,6 +51,13 @@ typedef struct CheckPoint
* it's set to InvalidTransactionId.
*/
TransactionId oldestActiveXid;
+
+ /*
+ * The replay state from every other node. This is only needed if wal_level
+ * >= logical and thus is only filled then.
+ */
+ XLogRecPtr logicalApplyState[MaxMultimasterNodeId - 1];
+ XLogRecPtr logicalReceiveState[MaxMultimasterNodeId - 1];
} CheckPoint;

/* XLOG info values for XLOG rmgr */
@@ -85,6 +93,9 @@ typedef enum DBState
* NOTE: try to keep this under 512 bytes so that it will fit on one physical
* sector of typical disk drives. This reduces the odds of corruption due to
* power failure midway through a write.
+ *
+ * FIXME: in order to allow many nodes in mm (which increases checkpoint size)
+ * we should change the writing of this to write(temp_file);fsync();rename();fsync();
*/

typedef struct ControlFileData
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index d21ec94..c9ab1be 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -14,6 +14,8 @@

#include "access/xlog.h"
#include "access/xlogdefs.h"
+#include "replication/logical.h"
+#include "storage/latch.h"
#include "storage/spin.h"
#include "pgtime.h"

@@ -90,6 +92,17 @@ typedef struct
char conninfo[MAXCONNINFO];

slock_t mutex; /* locks shared variables shown above */
+
+ /*
+ * replay point up to which we replayed for every node
+ * XXX: should possibly be dynamically sized?
+ * FIXME: should go to its own shm segment?
+ */
+ XLogRecPtr mm_receiveState[MaxMultimasterNodeId - 1];
+ XLogRecPtr mm_applyState[MaxMultimasterNodeId - 1];
+
+ Latch* mm_receiveLatch[MaxMultimasterNodeId - 1];
+
} WalRcvData;

extern WalRcvData *WalRcv;
--
1.7.10.rc3.3.g19a6c.dirty

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2012-06-13 11:28:44 [PATCH 13/16] Introduction of pair of logical walreceiver/sender
Previous Message Andres Freund 2012-06-13 11:28:42 [PATCH 11/16] Add infrastructure for manipulating multiple streams of wal on a segment handling level