From 1f9d3fe6f1fb9a9b39ea6bd9e1776a769fac8ea9 Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Wed, 22 Feb 2017 00:57:33 +0100 Subject: [PATCH 4/5] Fix xl_running_xacts usage in snapshot builder Due to race condition, the xl_running_xacts might contain no longer running transactions. Previous coding tried to get around this by additional locking but that did not work correctly for committs. Instead try combining decoded commits and multiple xl_running_xacts to get the consistent snapshot. This also reverts changes made to GetRunningTransactionData() and LogStandbySnapshot() by b89e151 as the additional locking does not help. --- src/backend/replication/logical/snapbuild.c | 195 ++++++++++++++++++---------- src/backend/storage/ipc/procarray.c | 5 +- src/backend/storage/ipc/standby.c | 19 --- 3 files changed, 130 insertions(+), 89 deletions(-) diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 3e34f75..d989576 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1220,6 +1220,82 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact builder->last_serialized_snapshot); } +/* + * Start tracking transactions based on the info we get from xl_running_xacts. + */ +static void +SnapBuildStartXactTracking(SnapBuild *builder, xl_running_xacts *running) +{ + int off; + + /* + * We only care about toplevel xids as those are the ones we + * definitely see in the wal stream. As snapbuild.c tracks committed + * instead of running transactions we don't need to know anything + * about uncommitted subtransactions. + */ + + /* + * Start with an xmin/xmax that's correct for future, when all the + * currently running transactions have finished. We'll update both + * while waiting for the pending transactions to finish. + */ + builder->xmin = running->nextXid; /* < are finished */ + builder->xmax = running->nextXid; /* >= are running */ + + /* so we can safely use the faster comparisons */ + Assert(TransactionIdIsNormal(builder->xmin)); + Assert(TransactionIdIsNormal(builder->xmax)); + + builder->running.xcnt = running->xcnt; + builder->running.xcnt_space = running->xcnt; + builder->running.xip = + MemoryContextAlloc(builder->context, + builder->running.xcnt * sizeof(TransactionId)); + memcpy(builder->running.xip, running->xids, + builder->running.xcnt * sizeof(TransactionId)); + + /* sort so we can do a binary search */ + qsort(builder->running.xip, builder->running.xcnt, + sizeof(TransactionId), xidComparator); + + builder->running.xmin = builder->running.xip[0]; + builder->running.xmax = builder->running.xip[running->xcnt - 1]; + + + /* makes comparisons cheaper later */ + TransactionIdRetreat(builder->running.xmin); + TransactionIdAdvance(builder->running.xmax); + + builder->state = SNAPBUILD_FULL_SNAPSHOT; + + /* + * Iterate through all xids, wait for them to finish. + * + * This isn't required for the correctness of decoding, but to allow + * isolationtester to notice that we're currently waiting for + * something. + */ + for (off = 0; off < builder->running.xcnt; off++) + { + TransactionId xid = builder->running.xip[off]; + + /* + * Upper layers should prevent that we ever need to wait on + * ourselves. Check anyway, since failing to do so would either + * result in an endless wait or an Assert() failure. + */ + if (TransactionIdIsCurrentTransactionId(xid)) + elog(ERROR, "waiting for ourselves"); + + /* + * This isn't required for the correctness of decoding, but to allow + * isolationtester to notice that we're currently waiting for + * something. + */ + XactLockTableWait(xid, NULL, NULL, XLTW_None); + } +} /* * Build the start of a snapshot that's capable of decoding the catalog. @@ -1241,7 +1317,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * was inserted, jump to CONSISTENT immediately. We might find such a * state we were waiting for b) or c). * - * b) Wait for all toplevel transactions that were running to end. We + * b) This (in a previous run) or another decoding slot serialized a + * snapshot to disk that we can use. We can't use this method for the + * initial snapshot when slot is being created and needs full snapshot + * for export or direct use. + + * c) Wait for all toplevel transactions that were running to end. We * simply track the number of in-progress toplevel transactions and * lower it whenever one commits or aborts. When that number * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT @@ -1252,11 +1333,6 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * Interestingly, in contrast to HS, this allows us not to care about * subtransactions - and by extension suboverflowed xl_running_xacts - * at all. - * - * c) This (in a previous run) or another decoding slot serialized a - * snapshot to disk that we can use. We can't use this method for the - * initial snapshot when slot is being created and needs full snapshot - * for export or direct use. * --- */ @@ -1311,7 +1387,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn return false; } - /* c) valid on disk state and not full snapshot */ + /* b) valid on disk state and not full snapshot */ else if (!builder->building_full_snapshot && SnapBuildRestore(builder, lsn)) { @@ -1319,54 +1395,14 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn return false; } /* - * b) first encounter of a useable xl_running_xacts record. If we had + * c) first encounter of a useable xl_running_xacts record. If we had * found one earlier we would either track running transactions (i.e. * builder->running.xcnt != 0) or be consistent (this function wouldn't * get called). */ else if (!builder->running.xcnt) { - int off; - - /* - * We only care about toplevel xids as those are the ones we - * definitely see in the wal stream. As snapbuild.c tracks committed - * instead of running transactions we don't need to know anything - * about uncommitted subtransactions. - */ - - /* - * Start with an xmin/xmax that's correct for future, when all the - * currently running transactions have finished. We'll update both - * while waiting for the pending transactions to finish. - */ - builder->xmin = running->nextXid; /* < are finished */ - builder->xmax = running->nextXid; /* >= are running */ - - /* so we can safely use the faster comparisons */ - Assert(TransactionIdIsNormal(builder->xmin)); - Assert(TransactionIdIsNormal(builder->xmax)); - - builder->running.xcnt = running->xcnt; - builder->running.xcnt_space = running->xcnt; - builder->running.xip = - MemoryContextAlloc(builder->context, - builder->running.xcnt * sizeof(TransactionId)); - memcpy(builder->running.xip, running->xids, - builder->running.xcnt * sizeof(TransactionId)); - - /* sort so we can do a binary search */ - qsort(builder->running.xip, builder->running.xcnt, - sizeof(TransactionId), xidComparator); - - builder->running.xmin = builder->running.xip[0]; - builder->running.xmax = builder->running.xip[running->xcnt - 1]; - - /* makes comparisons cheaper later */ - TransactionIdRetreat(builder->running.xmin); - TransactionIdAdvance(builder->running.xmax); - - builder->state = SNAPBUILD_FULL_SNAPSHOT; + SnapBuildStartXactTracking(builder, running); ereport(LOG, (errmsg("logical decoding found initial starting point at %X/%X", @@ -1376,30 +1412,53 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn builder->running.xcnt, (uint32) builder->running.xcnt))); + /* nothing could have built up so far, so don't perform cleanup */ + return false; + } + /* + * c) we already seen the xl_running_xacts and tried to do the above. + * However because of race condition in LogStandbySnapshot() there might + * have been transaction reported as running but in reality has already + * written commit record before the xl_running_xacts so decoding has + * missed it. We now see xl_running_xacts that suggests all transactions + * from the original one were closed but the consistent state wasn't + * reached which means the race condition has indeed happened. + * + * Start tracking again as if this was the first xl_running_xacts we've + * seen, with the advantage that because decoding was already running, + * any transactions committed before the xl_running_xacts record will be + * known to us so we won't hit with the same issue again. + */ + else if (TransactionIdFollows(running->oldestRunningXid, + builder->running.xmax)) + { + int off; + + SnapBuildStartXactTracking(builder, running); + /* - * Iterate through all xids, wait for them to finish. + * Nark any transactions that are known to have committed before the + * xl_running_xacts as finished to avoid the race condition in + * LogStandbySnapshot(). * - * This isn't required for the correctness of decoding, but to allow - * isolationtester to notice that we're currently waiting for - * something. + * We can use SnapBuildEndTxn directly as it only does the + * transaction running check and handling without any additional + * side effects. */ - for (off = 0; off < builder->running.xcnt; off++) - { - TransactionId xid = builder->running.xip[off]; - - /* - * Upper layers should prevent that we ever need to wait on - * ourselves. Check anyway, since failing to do so would either - * result in an endless wait or an Assert() failure. - */ - if (TransactionIdIsCurrentTransactionId(xid)) - elog(ERROR, "waiting for ourselves"); + for (off = 0; off < builder->committed.xcnt; off++) + SnapBuildEndTxn(builder, lsn, builder->committed.xip[off]); - XactLockTableWait(xid, NULL, NULL, XLTW_None); - } + /* We might have reached consistent point now. */ + if (builder->state == SNAPBUILD_CONSISTENT) + return false; - /* nothing could have built up so far, so don't perform cleanup */ - return false; + ereport(LOG, + (errmsg("logical decoding moved initial starting point to %X/%X", + (uint32) (lsn >> 32), (uint32) lsn), + errdetail_plural("%u transaction needs to finish.", + "%u transactions need to finish.", + builder->running.xcnt, + (uint32) builder->running.xcnt))); } /* diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index ebf6a92..b3d6829 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2060,12 +2060,13 @@ GetRunningTransactionData(void) CurrentRunningXacts->oldestRunningXid = oldestRunningXid; CurrentRunningXacts->latestCompletedXid = latestCompletedXid; + /* We don't release XidGenLock here, the caller is responsible for that */ + LWLockRelease(ProcArrayLock); + Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid)); Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid)); Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid)); - /* We don't release the locks here, the caller is responsible for that */ - return CurrentRunningXacts; } diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 8e57f93..ddb279e 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -929,27 +929,8 @@ LogStandbySnapshot(void) */ running = GetRunningTransactionData(); - /* - * GetRunningTransactionData() acquired ProcArrayLock, we must release it. - * For Hot Standby this can be done before inserting the WAL record - * because ProcArrayApplyRecoveryInfo() rechecks the commit status using - * the clog. For logical decoding, though, the lock can't be released - * early because the clog might be "in the future" from the POV of the - * historic snapshot. This would allow for situations where we're waiting - * for the end of a transaction listed in the xl_running_xacts record - * which, according to the WAL, has committed before the xl_running_xacts - * record. Fortunately this routine isn't executed frequently, and it's - * only a shared lock. - */ - if (wal_level < WAL_LEVEL_LOGICAL) - LWLockRelease(ProcArrayLock); - recptr = LogCurrentRunningXacts(running); - /* Release lock if we kept it longer ... */ - if (wal_level >= WAL_LEVEL_LOGICAL) - LWLockRelease(ProcArrayLock); - /* GetRunningTransactionData() acquired XidGenLock, we must release it */ LWLockRelease(XidGenLock); -- 2.7.4