[PATCH 11/16] Add infrastructure for manipulating multiple streams of wal on a segment handling level

From: Andres Freund <andres(at)2ndquadrant(dot)com>
To: pgsql-hackers(at)postgresql(dot)org
Subject: [PATCH 11/16] Add infrastructure for manipulating multiple streams of wal on a segment handling level
Date: 2012-06-13 11:28:42
Message-ID: 1339586927-13156-11-git-send-email-andres@2ndquadrant.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

From: Andres Freund <andres(at)anarazel(dot)de>

For that add a 'node_id' parameter to most commands dealing with wal
segments. A node_id thats 'InvalidMultimasterNodeId' references local wal,
every other node_id referes to wal in a new pg_lcr directory.

Using duplicated code would reduce the impact of that change but the long-term
code-maintenance burden outweighs that by a far bit.

Besides the decision to add a 'node_id' parameter to several functions the
changes in this patch are fairly mechanical.
---
src/backend/access/transam/xlog.c | 54 ++++++++++++++++-----------
src/backend/replication/basebackup.c | 4 +-
src/backend/replication/walreceiver.c | 2 +-
src/backend/replication/walsender.c | 9 +++--
src/bin/initdb/initdb.c | 1 +
src/bin/pg_resetxlog/pg_resetxlog.c | 2 +-
src/include/access/xlog.h | 2 +-
src/include/access/xlog_internal.h | 13 +++++--
src/include/replication/logical.h | 2 +
src/include/replication/walsender_private.h | 2 +-
10 files changed, 56 insertions(+), 35 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 504b4d0..0622726 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -635,8 +635,8 @@ static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
static bool AdvanceXLInsertBuffer(bool new_segment);
static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
-static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
- bool find_free, int *max_advance,
+static bool InstallXLogFileSegment(RepNodeId node_id, uint32 *log, uint32 *seg,
+ char *tmppath, bool find_free, int *max_advance,
bool use_lock);
static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
int source, bool notexistOk);
@@ -1736,8 +1736,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)

/* create/use new log file */
use_existent = true;
- openLogFile = XLogFileInit(openLogId, openLogSeg,
- &use_existent, true);
+ openLogFile = XLogFileInit(InvalidMultimasterNodeId, openLogId,
+ openLogSeg, &use_existent, true);
openLogOff = 0;
}

@@ -2376,6 +2376,9 @@ XLogNeedsFlush(XLogRecPtr record)
* place. This should be TRUE except during bootstrap log creation. The
* caller must *not* hold the lock at call.
*
+ * node_id: if != InvalidMultimasterNodeId this xlog file is actually a LCR
+ * file
+ *
* Returns FD of opened file.
*
* Note: errors here are ERROR not PANIC because we might or might not be
@@ -2384,8 +2387,8 @@ XLogNeedsFlush(XLogRecPtr record)
* in a critical section.
*/
int
-XLogFileInit(uint32 log, uint32 seg,
- bool *use_existent, bool use_lock)
+XLogFileInit(RepNodeId node_id, uint32 log, uint32 seg,
+ bool *use_existent, bool use_lock)
{
char path[MAXPGPATH];
char tmppath[MAXPGPATH];
@@ -2396,7 +2399,7 @@ XLogFileInit(uint32 log, uint32 seg,
int fd;
int nbytes;

- XLogFilePath(path, ThisTimeLineID, log, seg);
+ XLogFilePath(path, ThisTimeLineID, node_id, log, seg);

/*
* Try to use existent file (checkpoint maker may have created it already)
@@ -2425,6 +2428,11 @@ XLogFileInit(uint32 log, uint32 seg,
*/
elog(DEBUG2, "creating and filling new WAL file");

+ /*
+ * FIXME: to be safe we need to create tempfile in the pg_lcr directory if
+ * its actually an lcr file because pg_lcr might be in a different
+ * partition.
+ */
snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());

unlink(tmppath);
@@ -2493,7 +2501,7 @@ XLogFileInit(uint32 log, uint32 seg,
installed_log = log;
installed_seg = seg;
max_advance = XLOGfileslop;
- if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
+ if (!InstallXLogFileSegment(node_id, &installed_log, &installed_seg, tmppath,
*use_existent, &max_advance,
use_lock))
{
@@ -2548,7 +2556,7 @@ XLogFileCopy(uint32 log, uint32 seg,
/*
* Open the source file
*/
- XLogFilePath(path, srcTLI, srclog, srcseg);
+ XLogFilePath(path, srcTLI, InvalidMultimasterNodeId, srclog, srcseg);
srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (srcfd < 0)
ereport(ERROR,
@@ -2619,7 +2627,8 @@ XLogFileCopy(uint32 log, uint32 seg,
/*
* Now move the segment into place with its final name.
*/
- if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
+ if (!InstallXLogFileSegment(InvalidMultimasterNodeId, &log, &seg, tmppath,
+ false, NULL, false))
elog(ERROR, "InstallXLogFileSegment should not have failed");
}

@@ -2653,14 +2662,14 @@ XLogFileCopy(uint32 log, uint32 seg,
* file into place.
*/
static bool
-InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
+InstallXLogFileSegment(RepNodeId node_id, uint32 *log, uint32 *seg, char *tmppath,
bool find_free, int *max_advance,
bool use_lock)
{
char path[MAXPGPATH];
struct stat stat_buf;

- XLogFilePath(path, ThisTimeLineID, *log, *seg);
+ XLogFilePath(path, ThisTimeLineID, node_id, *log, *seg);

/*
* We want to be sure that only one process does this at a time.
@@ -2687,7 +2696,7 @@ InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
}
NextLogSeg(*log, *seg);
(*max_advance)--;
- XLogFilePath(path, ThisTimeLineID, *log, *seg);
+ XLogFilePath(path, ThisTimeLineID, node_id, *log, *seg);
}
}

@@ -2736,7 +2745,7 @@ XLogFileOpen(uint32 log, uint32 seg)
char path[MAXPGPATH];
int fd;

- XLogFilePath(path, ThisTimeLineID, log, seg);
+ XLogFilePath(path, ThisTimeLineID, InvalidMultimasterNodeId, log, seg);

fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
S_IRUSR | S_IWUSR);
@@ -2783,7 +2792,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,

case XLOG_FROM_PG_XLOG:
case XLOG_FROM_STREAM:
- XLogFilePath(path, tli, log, seg);
+ XLogFilePath(path, tli, InvalidMultimasterNodeId, log, seg);
restoredFromArchive = false;
break;

@@ -2804,7 +2813,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
bool reload = false;
struct stat statbuf;

- XLogFilePath(xlogfpath, tli, log, seg);
+ XLogFilePath(xlogfpath, tli, InvalidMultimasterNodeId, log, seg);
if (stat(xlogfpath, &statbuf) == 0)
{
if (unlink(xlogfpath) != 0)
@@ -2922,7 +2931,7 @@ XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, int sources)
}

/* Couldn't find it. For simplicity, complain about front timeline */
- XLogFilePath(path, recoveryTargetTLI, log, seg);
+ XLogFilePath(path, recoveryTargetTLI, InvalidMultimasterNodeId, log, seg);
errno = ENOENT;
ereport(emode,
(errcode_for_file_access(),
@@ -3366,7 +3375,8 @@ PreallocXlogFiles(XLogRecPtr endptr)
{
NextLogSeg(_logId, _logSeg);
use_existent = true;
- lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
+ lf = XLogFileInit(InvalidMultimasterNodeId, _logId, _logSeg,
+ &use_existent, true);
close(lf);
if (!use_existent)
CheckpointStats.ckpt_segs_added++;
@@ -3486,8 +3496,9 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
* separate archive directory.
*/
if (lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
- InstallXLogFileSegment(&endlogId, &endlogSeg, path,
- true, &max_advance, true))
+ InstallXLogFileSegment(InvalidMultimasterNodeId, &endlogId,
+ &endlogSeg, path, true,
+ &max_advance, true))
{
ereport(DEBUG2,
(errmsg("recycled transaction log file \"%s\"",
@@ -5255,7 +5266,8 @@ BootStrapXLOG(void)

/* Create first XLOG segment file */
use_existent = false;
- openLogFile = XLogFileInit(0, 1, &use_existent, false);
+ openLogFile = XLogFileInit(InvalidMultimasterNodeId, 0, 1,
+ &use_existent, false);

/* Write the first page with the initial record */
errno = 0;
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 0bc88a4..47e4641 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -245,7 +245,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
char fn[MAXPGPATH];
int i;

- XLogFilePath(fn, ThisTimeLineID, logid, logseg);
+ XLogFilePath(fn, ThisTimeLineID, InvalidMultimasterNodeId, logid, logseg);
_tarWriteHeader(fn, NULL, &statbuf);

/* Send the actual WAL file contents, block-by-block */
@@ -264,7 +264,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
* http://lists.apple.com/archives/xcode-users/2003/Dec//msg000
* 51.html
*/
- XLogRead(buf, ptr, TAR_SEND_SIZE);
+ XLogRead(buf, InvalidMultimasterNodeId, ptr, TAR_SEND_SIZE);
if (pq_putmessage('d', buf, TAR_SEND_SIZE))
ereport(ERROR,
(errmsg("base backup could not send data, aborting backup")));
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 650b74f..e97196b 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -509,7 +509,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
/* Create/use new log file */
XLByteToSeg(recptr, recvId, recvSeg);
use_existent = true;
- recvFile = XLogFileInit(recvId, recvSeg, &use_existent, true);
+ recvFile = XLogFileInit(InvalidMultimasterNodeId, recvId, recvSeg, &use_existent, true);
recvOff = 0;
}

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e44c734..8cd3a00 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -977,7 +977,7 @@ WalSndKill(int code, Datum arg)
* more than one.
*/
void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+XLogRead(char *buf, RepNodeId node_id, XLogRecPtr startptr, Size count)
{
char *p;
XLogRecPtr recptr;
@@ -1009,8 +1009,8 @@ retry:
close(sendFile);

XLByteToSeg(recptr, sendId, sendSeg);
- XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
-
+ XLogFilePath(path, ThisTimeLineID, node_id,
+ sendId, sendSeg);
sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (sendFile < 0)
{
@@ -1215,7 +1215,8 @@ XLogSend(char *msgbuf, bool *caughtup)
* Read the log directly into the output buffer to avoid extra memcpy
* calls.
*/
- XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
+ XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), InvalidMultimasterNodeId,
+ startptr, nbytes);

/*
* We fill the message header last so that the send timestamp is taken as
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 3789948..1f26382 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -2637,6 +2637,7 @@ main(int argc, char *argv[])
"global",
"pg_xlog",
"pg_xlog/archive_status",
+ "pg_lcr",
"pg_clog",
"pg_notify",
"pg_serial",
diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c
index 65ba910..7ee3a3a 100644
--- a/src/bin/pg_resetxlog/pg_resetxlog.c
+++ b/src/bin/pg_resetxlog/pg_resetxlog.c
@@ -973,7 +973,7 @@ WriteEmptyXLOG(void)

/* Write the first page */
XLogFilePath(path, ControlFile.checkPointCopy.ThisTimeLineID,
- newXlogId, newXlogSeg);
+ InvalidMultimasterNodeId, newXlogId, newXlogSeg);

unlink(path);

diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index dd89cff..3b02c0b 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -268,7 +268,7 @@ extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
extern void XLogFlush(XLogRecPtr RecPtr);
extern bool XLogBackgroundFlush(void);
extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
-extern int XLogFileInit(uint32 log, uint32 seg,
+extern int XLogFileInit(RepNodeId node_id, uint32 log, uint32 seg,
bool *use_existent, bool use_lock);
extern int XLogFileOpen(uint32 log, uint32 seg);

diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 3328a50..deadddf 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -19,6 +19,7 @@
#include "access/xlog.h"
#include "fmgr.h"
#include "pgtime.h"
+#include "replication/logical.h"
#include "storage/block.h"
#include "storage/relfilenode.h"

@@ -216,14 +217,11 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
#define MAXFNAMELEN 64

#define XLogFileName(fname, tli, log, seg) \
- snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, log, seg)
+ snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, log, seg);

#define XLogFromFileName(fname, tli, log, seg) \
sscanf(fname, "%08X%08X%08X", tli, log, seg)

-#define XLogFilePath(path, tli, log, seg) \
- snprintf(path, MAXPGPATH, XLOGDIR "/%08X%08X%08X", tli, log, seg)
-
#define TLHistoryFileName(fname, tli) \
snprintf(fname, MAXFNAMELEN, "%08X.history", tli)

@@ -239,6 +237,13 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
#define BackupHistoryFilePath(path, tli, log, seg, offset) \
snprintf(path, MAXPGPATH, XLOGDIR "/%08X%08X%08X.%08X.backup", tli, log, seg, offset)

+/* FIXME: move to xlogutils.c, needs to fix sharing with receivexlog.c first though */
+static inline int XLogFilePath(char* path, TimeLineID tli, RepNodeId node_id, uint32 log, uint32 seg){
+ if(node_id == InvalidMultimasterNodeId)
+ return snprintf(path, MAXPGPATH, XLOGDIR "/%08X%08X%08X", tli, log, seg);
+ else
+ return snprintf(path, MAXPGPATH, LCRDIR "/%d/%08X%08X%08X", node_id, tli, log, seg);
+}

/*
* Method table for resource managers.
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 0698b61..8f44fad 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -19,4 +19,6 @@ extern XLogRecPtr current_replication_origin_lsn;

#define InvalidMultimasterNodeId 0
#define MaxMultimasterNodeId (2<<3)
+
+#define LCRDIR "pg_lcr"
#endif
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 66234cd..bc58ff4 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -95,7 +95,7 @@ extern WalSndCtlData *WalSndCtl;


extern void WalSndSetState(WalSndState state);
-extern void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+extern void XLogRead(char *buf, RepNodeId node_id, XLogRecPtr startptr, Size count);

/*
* Internal functions for parsing the replication grammar, in repl_gram.y and
--
1.7.10.rc3.3.g19a6c.dirty

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2012-06-13 11:28:43 [PATCH 12/16] Add state to keep track of logical replication
Previous Message Andres Freund 2012-06-13 11:28:41 [PATCH 10/16] Introduce the concept that wal has a 'origin' node