Re: [HACKERS] logical decoding of two-phase transactions

From: Andres Freund <andres(at)anarazel(dot)de>
To: Nikhil Sontakke <nikhils(at)2ndquadrant(dot)com>
Cc: Peter Eisentraut <peter(dot)eisentraut(at)2ndquadrant(dot)com>, Simon Riggs <simon(at)2ndquadrant(dot)com>, Craig Ringer <craig(at)2ndquadrant(dot)com>, Petr Jelinek <petr(dot)jelinek(at)2ndquadrant(dot)com>, Sokolov Yura <y(dot)sokolov(at)postgrespro(dot)ru>, Stas Kelvich <s(dot)kelvich(at)postgrespro(dot)ru>, Dmitry Dolgov <9erthalion6(at)gmail(dot)com>, Tom Lane <tgl(at)sss(dot)pgh(dot)pa(dot)us>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>, Robert Haas <robertmhaas(at)gmail(dot)com>
Subject: Re: [HACKERS] logical decoding of two-phase transactions
Date: 2018-02-09 21:10:25
Message-ID: 20180209211025.d7jxh43fhqnevhji@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

First off: This patch has way too many different types of changes as
part of one huge commit. This needs to be split into several
pieces. First the cleanups (e.g. the fields -> flag changes), then the
individual infrastructure pieces (like the twophase.c changes, best
split into several pieces as well, the locking stuff), then the main
feature, then support for it in the output plugin. Each should have an
individual explanation about why the change is necessary and not a bad
idea.

On 2018-02-06 17:50:40 +0530, Nikhil Sontakke wrote:
> @@ -46,6 +48,9 @@ typedef struct
> bool skip_empty_xacts;
> bool xact_wrote_changes;
> bool only_local;
> + bool twophase_decoding;
> + bool twophase_decode_with_catalog_changes;
> + int decode_delay; /* seconds to sleep after every change record */

This seems too big a crock to add just for testing. It'll also make the
testing timing dependent...

> } TestDecodingData;

> void
> _PG_init(void)
> @@ -85,9 +106,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
> cb->begin_cb = pg_decode_begin_txn;
> cb->change_cb = pg_decode_change;
> cb->commit_cb = pg_decode_commit_txn;
> + cb->abort_cb = pg_decode_abort_txn;

> cb->filter_by_origin_cb = pg_decode_filter;
> cb->shutdown_cb = pg_decode_shutdown;
> cb->message_cb = pg_decode_message;
> + cb->filter_prepare_cb = pg_filter_prepare;
> + cb->filter_decode_txn_cb = pg_filter_decode_txn;
> + cb->prepare_cb = pg_decode_prepare_txn;
> + cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
> + cb->abort_prepared_cb = pg_decode_abort_prepared_txn;
> }

Why does this introduce both abort_cb and abort_prepared_cb? That seems
to conflate two separate features.

> +/* Filter out unnecessary two-phase transactions */
> +static bool
> +pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
> + TransactionId xid, const char *gid)
> +{
> + TestDecodingData *data = ctx->output_plugin_private;
> +
> + /* treat all transactions as one-phase */
> + if (!data->twophase_decoding)
> + return true;
> +
> + if (txn && txn_has_catalog_changes(txn) &&
> + !data->twophase_decode_with_catalog_changes)
> + return true;

What? I'm INCREDIBLY doubtful this is a sane thing to expose to output
plugins. As in, unless I hear a very very convincing reason I'm strongly
opposed.

> +/*
> + * Check if we should continue to decode this transaction.
> + *
> + * If it has aborted in the meanwhile, then there's no sense
> + * in decoding and sending the rest of the changes, we might
> + * as well ask the subscribers to abort immediately.
> + *
> + * This should be called if we are streaming a transaction
> + * before it's committed or if we are decoding a 2PC
> + * transaction. Otherwise we always decode committed
> + * transactions
> + *
> + * Additional checks can be added here, as needed
> + */
> +static bool
> +pg_filter_decode_txn(LogicalDecodingContext *ctx,
> + ReorderBufferTXN *txn)
> +{
> + /*
> + * Due to caching, repeated TransactionIdDidAbort calls
> + * shouldn't be that expensive
> + */
> + if (txn != NULL &&
> + TransactionIdIsValid(txn->xid) &&
> + TransactionIdDidAbort(txn->xid))
> + return true;
> +
> + /* if txn is NULL, filter it out */

Why can this be NULL?

> + return (txn != NULL)? false:true;
> +}

This definitely shouldn't be a task for each output plugin. Even if we
want to make this configurable, I'm doubtful that it's a good idea to do
so here - make its much less likely to hit edge cases.

> static bool
> pg_decode_filter(LogicalDecodingContext *ctx,
> RepOriginId origin_id)
> @@ -409,8 +622,18 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
> }
> data->xact_wrote_changes = true;
>
> + if (!LogicalLockTransaction(txn))
> + return;

It really really can't be right that this is exposed to output plugins.

> + /* if decode_delay is specified, sleep with above lock held */
> + if (data->decode_delay > 0)
> + {
> + elog(LOG, "sleeping for %d seconds", data->decode_delay);
> + pg_usleep(data->decode_delay * 1000000L);
> + }

Really not on board.

> @@ -1075,6 +1077,21 @@ EndPrepare(GlobalTransaction gxact)
> Assert(hdr->magic == TWOPHASE_MAGIC);
> hdr->total_len = records.total_len + sizeof(pg_crc32c);
>
> + replorigin = (replorigin_session_origin != InvalidRepOriginId &&
> + replorigin_session_origin != DoNotReplicateId);
> +
> + if (replorigin)
> + {
> + Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
> + hdr->origin_lsn = replorigin_session_origin_lsn;
> + hdr->origin_timestamp = replorigin_session_origin_timestamp;
> + }
> + else
> + {
> + hdr->origin_lsn = InvalidXLogRecPtr;
> + hdr->origin_timestamp = 0;
> + }
> +
> /*
> * If the data size exceeds MaxAllocSize, we won't be able to read it in
> * ReadTwoPhaseFile. Check for that now, rather than fail in the case
> @@ -1107,7 +1124,16 @@ EndPrepare(GlobalTransaction gxact)
> XLogBeginInsert();
> for (record = records.head; record != NULL; record = record->next)
> XLogRegisterData(record->data, record->len);
> +
> + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
> +

Can we perhaps merge a bit of the code with the plain commit path on
this?

> gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
> +
> + if (replorigin)
> + /* Move LSNs forward for this replication origin */
> + replorigin_session_advance(replorigin_session_origin_lsn,
> + gxact->prepare_end_lsn);
> +

Why is it ok to do this at PREPARE time? I guess the theory is that the
origin LSN is going to be from the sources PREPARE too? If so, this
needs to be commented upon here.

> +/*
> + * ParsePrepareRecord
> + */
> +void
> +ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
> +{
> + TwoPhaseFileHeader *hdr;
> + char *bufptr;
> +
> + hdr = (TwoPhaseFileHeader *) xlrec;
> + bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
> +
> + parsed->origin_lsn = hdr->origin_lsn;
> + parsed->origin_timestamp = hdr->origin_timestamp;
> + parsed->twophase_xid = hdr->xid;
> + parsed->dbId = hdr->database;
> + parsed->nsubxacts = hdr->nsubxacts;
> + parsed->ncommitrels = hdr->ncommitrels;
> + parsed->nabortrels = hdr->nabortrels;
> + parsed->nmsgs = hdr->ninvalmsgs;
> +
> + strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
> + bufptr += MAXALIGN(hdr->gidlen);
> +
> + parsed->subxacts = (TransactionId *) bufptr;
> + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
> +
> + parsed->commitrels = (RelFileNode *) bufptr;
> + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
> +
> + parsed->abortrels = (RelFileNode *) bufptr;
> + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
> +
> + parsed->msgs = (SharedInvalidationMessage *) bufptr;
> + bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
> +}

So this is now basically a commit record. I quite dislike duplicating
things this way. Can't we make commit records versatile enough to
represent this without problems?

> /*
> * Reads 2PC data from xlog. During checkpoint this data will be moved to
> @@ -1365,7 +1428,7 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
> * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
> */
> void
> -FinishPreparedTransaction(const char *gid, bool isCommit)
> +FinishPreparedTransaction(const char *gid, bool isCommit, bool missing_ok)
> {
> GlobalTransaction gxact;
> PGPROC *proc;
> @@ -1386,8 +1449,20 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
> /*
> * Validate the GID, and lock the GXACT to ensure that two backends do not
> * try to commit the same GID at once.
> + *
> + * During logical decoding, on the apply side, it's possible that a prepared
> + * transaction got aborted while decoding. In that case, we stop the
> + * decoding and abort the transaction immediately. However the ROLLBACK
> + * prepared processing still reaches the subscriber. In that case it's ok
> + * to have a missing gid
> */
> - gxact = LockGXact(gid, GetUserId());
> + gxact = LockGXact(gid, GetUserId(), missing_ok);
> + if (gxact == NULL)
> + {
> + Assert(missing_ok && !isCommit);
> + return;
> + }

I'm very doubtful it is sane to handle this at such a low level.

> @@ -2358,6 +2443,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
> Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
> TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
>
> + if (origin_id != InvalidRepOriginId)
> + {
> + /* recover apply progress */
> + replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
> + false /* backward */ , false /* WAL */ );
> + }
> +

It's unclear to me why this is necessary / a good idea?

> case XLOG_XACT_PREPARE:
> + {
> + xl_xact_parsed_prepare parsed;
>
> - /*
> - * Currently decoding ignores PREPARE TRANSACTION and will just
> - * decode the transaction when the COMMIT PREPARED is sent or
> - * throw away the transaction's contents when a ROLLBACK PREPARED
> - * is received. In the future we could add code to expose prepared
> - * transactions in the changestream allowing for a kind of
> - * distributed 2PC.
> - */
> - ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
> + /* check that output plugin is capable of twophase decoding */
> + if (!ctx->enable_twophase)
> + {
> + ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
> + break;
> + }
> +
> + /* ok, parse it */
> + ParsePrepareRecord(XLogRecGetInfo(buf->record),
> + XLogRecGetData(buf->record), &parsed);
> +
> + /* does output plugin want this particular transaction? */
> + if (ctx->callbacks.filter_prepare_cb &&
> + ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid,
> + parsed.twophase_gid))
> + {
> + ReorderBufferProcessXid(reorder, parsed.twophase_xid,
> + buf->origptr);

We're calling ReorderBufferProcessXid() on two different xids in
different branches, is that intentional?

> + if (TransactionIdIsValid(parsed->twophase_xid) &&
> + ReorderBufferTxnIsPrepared(ctx->reorder,
> + parsed->twophase_xid, parsed->twophase_gid))
> + {
> + Assert(xid == parsed->twophase_xid);
> + /* we are processing COMMIT PREPARED */
> + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
> + commit_time, origin_id, origin_lsn, parsed->twophase_gid, true);
> + }
> + else
> + {
> + /* replay actions of all transaction + subtransactions in order */
> + ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
> + commit_time, origin_id, origin_lsn);
> + }
> +}

Why do we want this via the same routine?

> +bool
> +LogicalLockTransaction(ReorderBufferTXN *txn)
> +{
> + bool ok = false;
> +
> + /*
> + * Prepared transactions and uncommitted transactions
> + * that have modified catalogs need to interlock with
> + * concurrent rollback to ensure that there are no
> + * issues while decoding
> + */
> +
> + if (!txn_has_catalog_changes(txn))
> + return true;
> +
> + /*
> + * Is it a prepared txn? Similar checks for uncommitted
> + * transactions when we start supporting them
> + */
> + if (!txn_prepared(txn))
> + return true;
> +
> + /* check cached status */
> + if (txn_commit(txn))
> + return true;
> + if (txn_rollback(txn))
> + return false;
> +
> + /*
> + * Find the PROC that is handling this XID and add ourself as a
> + * decodeGroupMember
> + */
> + if (MyProc->decodeGroupLeader == NULL)
> + {
> + PGPROC *proc = BecomeDecodeGroupLeader(txn->xid, txn_prepared(txn));
> +
> + /*
> + * If decodeGroupLeader is NULL, then the only possibility
> + * is that the transaction completed and went away
> + */
> + if (proc == NULL)
> + {
> + Assert(!TransactionIdIsInProgress(txn->xid));
> + if (TransactionIdDidCommit(txn->xid))
> + {
> + txn->txn_flags |= TXN_COMMIT;
> + return true;
> + }
> + else
> + {
> + txn->txn_flags |= TXN_ROLLBACK;
> + return false;
> + }
> + }
> +
> + /* Add ourself as a decodeGroupMember */
> + if (!BecomeDecodeGroupMember(proc, proc->pid, txn_prepared(txn)))
> + {
> + Assert(!TransactionIdIsInProgress(txn->xid));
> + if (TransactionIdDidCommit(txn->xid))
> + {
> + txn->txn_flags |= TXN_COMMIT;
> + return true;
> + }
> + else
> + {
> + txn->txn_flags |= TXN_ROLLBACK;
> + return false;
> + }
> + }
> + }

Are we ok with this low-level lock / pgproc stuff happening outside of
procarray / lock related files? Where is the locking scheme documented?

> +/* ReorderBufferTXN flags */
> +#define TXN_HAS_CATALOG_CHANGES 0x0001
> +#define TXN_IS_SUBXACT 0x0002
> +#define TXN_SERIALIZED 0x0004
> +#define TXN_PREPARE 0x0008
> +#define TXN_COMMIT_PREPARED 0x0010
> +#define TXN_ROLLBACK_PREPARED 0x0020
> +#define TXN_COMMIT 0x0040
> +#define TXN_ROLLBACK 0x0080
> +
> +/* does the txn have catalog changes */
> +#define txn_has_catalog_changes(txn) (txn->txn_flags & TXN_HAS_CATALOG_CHANGES)
> +/* is the txn known as a subxact? */
> +#define txn_is_subxact(txn) (txn->txn_flags & TXN_IS_SUBXACT)
> +/*
> + * Has this transaction been spilled to disk? It's not always possible to
> + * deduce that fact by comparing nentries with nentries_mem, because e.g.
> + * subtransactions of a large transaction might get serialized together
> + * with the parent - if they're restored to memory they'd have
> + * nentries_mem == nentries.
> + */
> +#define txn_is_serialized(txn) (txn->txn_flags & TXN_SERIALIZED)
> +/* is this txn prepared? */
> +#define txn_prepared(txn) (txn->txn_flags & TXN_PREPARE)
> +/* was this prepared txn committed in the meanwhile? */
> +#define txn_commit_prepared(txn) (txn->txn_flags & TXN_COMMIT_PREPARED)
> +/* was this prepared txn aborted in the meanwhile? */
> +#define txn_rollback_prepared(txn) (txn->txn_flags & TXN_ROLLBACK_PREPARED)
> +/* was this txn committed in the meanwhile? */
> +#define txn_commit(txn) (txn->txn_flags & TXN_COMMIT)
> +/* was this prepared txn aborted in the meanwhile? */
> +#define txn_rollback(txn) (txn->txn_flags & TXN_ROLLBACK)
> +

These txn_* names seem too generic imo - fairly likely to conflict with
other pieces of code imo.

Greetings,

Andres Freund

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Alvaro Herrera 2018-02-09 21:45:29 Re: [PATCH][PROPOSAL] Add enum releation option type
Previous Message Robert Haas 2018-02-09 21:01:35 Re: Add PGDLLIMPORT to enable_hashagg