From 073dfa48f2361b8ee6a656bcbe57d11cad4cc2b3 Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Fri, 24 Feb 2017 21:39:03 +0100 Subject: [PATCH 1/5] Reserve global xmin for create slot snasphot export Otherwise the VACUUM or pruning might remove tuples still needed by the exported snapshot. --- src/backend/replication/logical/logical.c | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5529ac8..58e1c80 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -267,12 +267,18 @@ CreateInitDecodingContext(char *plugin, * the slot machinery about the new limit. Once that's done the * ProcArrayLock can be released as the slot machinery now is * protecting against vacuum. + * + * Note that we only store the global xmin temporarily in the in-memory + * state so that the initial snapshot can be exported. After initial + * snapshot is done global xmin should be reset and not tracked anymore + * so we are fine with losing the global xmin after crash. * ---- */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId(); slot->data.catalog_xmin = slot->effective_catalog_xmin; + slot->effective_xmin = slot->effective_catalog_xmin; ReplicationSlotsComputeRequiredXmin(true); @@ -282,7 +288,7 @@ CreateInitDecodingContext(char *plugin, * tell the snapshot builder to only assemble snapshot once reaching the * running_xact's record with the respective xmin. */ - xmin_horizon = slot->data.catalog_xmin; + xmin_horizon = slot->effective_xmin; ReplicationSlotMarkDirty(); ReplicationSlotSave(); @@ -456,9 +462,28 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) void FreeDecodingContext(LogicalDecodingContext *ctx) { + ReplicationSlot *slot = MyReplicationSlot; + if (ctx->callbacks.shutdown_cb != NULL) shutdown_cb_wrapper(ctx); + /* + * Cleanup global xmin for the slot that we may have set in + * CreateInitDecodingContext(). It's okay to do this here unconditionally + * because we only care for the global xmin for exported snapshots and if + * we exported one we used the required xmin for the current backend + * proccess in SnapBuildInitialSnapshot(). + * + * We do not take ProcArrayLock or similar since we only reset xmin here + * and there's not much harm done by a concurrent computation missing + * that and ReplicationSlotsComputeRequiredXmin will do locking as + * neccessary. + */ + SpinLockAcquire(&slot->mutex); + slot->effective_xmin = InvalidTransactionId; + SpinLockRelease(&slot->mutex); + ReplicationSlotsComputeRequiredXmin(false); + ReorderBufferFree(ctx->reorder); FreeSnapshotBuilder(ctx->snapshot_builder); XLogReaderFree(ctx->reader); -- 2.7.4