Index: access/transam/clog.c =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/backend/access/transam/clog.c,v retrieving revision 1.18 diff -u -r1.18 clog.c --- access/transam/clog.c 4 Aug 2003 02:39:57 -0000 1.18 +++ access/transam/clog.c 10 Oct 2003 07:32:48 -0000 @@ -25,6 +25,8 @@ #include #include +#include "utils/hsearch.h" +#include "access/xact.h" #include "access/clog.h" #include "access/slru.h" #include "storage/lwlock.h" @@ -93,8 +95,10 @@ int byteno = TransactionIdToByte(xid); int bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT; char *byteptr; + XidStatus currentStatus; Assert(status == TRANSACTION_STATUS_COMMITTED || + status == TRANSACTION_STATUS_PREPARED || status == TRANSACTION_STATUS_ABORTED); LWLockAcquire(ClogCtl->locks->ControlLock, LW_EXCLUSIVE); @@ -103,9 +107,14 @@ byteptr += byteno; /* Current state should be 0 or target state */ - Assert(((*byteptr >> bshift) & CLOG_XACT_BITMASK) == 0 || - ((*byteptr >> bshift) & CLOG_XACT_BITMASK) == status); + currentStatus = (*byteptr >> bshift) & CLOG_XACT_BITMASK; + if(!(currentStatus == TRANSACTION_STATUS_IN_PROGRESS || + currentStatus == TRANSACTION_STATUS_PREPARED || + currentStatus == status)) { + elog(DEBUG1, "#### xid: %d, currentStatus = %d, status = %d ####", xid, currentStatus, status); + } + *byteptr &= ~(CLOG_XACT_BITMASK << bshift); *byteptr |= (status << bshift); /* ...->page_status[slotno] = CLOG_PAGE_DIRTY; already done */ @@ -141,6 +150,208 @@ } +XACTGLOBAL *XactGlobal; + +void +InitPreparedTransactions() +{ + bool found = false; + HASHCTL info; + + info.keysize = GIDSIZE; + info.entrysize = sizeof(GXACT); + info.hash = string_hash; + + /* Create or attach to the shared structure */ + XactGlobal = (XACTGLOBAL *) ShmemInitStruct("XactGlobal", sizeof(XACTGLOBAL), &found); + + /* We're the first - initialize. */ + if (!found) + { + XactGlobal->gidHash = ShmemInitHash("global transaction hashtable", 10, 100, &info, + HASH_ELEM | HASH_FUNCTION); + if(!XactGlobal->gidHash) + elog(FATAL, "could not initialize global transaction hash table"); + + SHMQueueInit(&XactGlobal->gxactList); + } +} + +bool +MarkAsPrepared(TransactionId xid, char *gid, XLogRecPtr logRec) +{ + GXACT *gxact; + bool found = false; + + elog(DEBUG1, "MarkAsPrepared, xid = %d logRec = %x %x", xid, logRec.xlogid, logRec.xrecoff); + LWLockAcquire(GlobalXactLock, LW_EXCLUSIVE); + + gxact = (GXACT *) hash_search(XactGlobal->gidHash, + (void *) gid, + HASH_ENTER, &found); + + if(!gxact) { + LWLockRelease(GlobalXactLock); + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + return FALSE; + } + + if(found) { + LWLockRelease(GlobalXactLock); + elog(ERROR, "Global transaction is already prepared"); + } + TransactionIdStore(xid, &gxact->xid); + strncpy(gxact->gid, gid, GIDSIZE); + gxact->logRec = logRec; + SHMQueueInsertBefore(&XactGlobal->gxactList, &gxact->link); + + LWLockRelease(GlobalXactLock); + + return TRUE; +} + +TransactionId +MarkNoLongerPrepared(char *gid) +{ + GXACT *gxact; + TransactionId xid; + + LWLockAcquire(GlobalXactLock, LW_EXCLUSIVE); + + gxact = (GXACT *) hash_search(XactGlobal->gidHash, + (void *) gid, + HASH_FIND_SAVE, NULL); + + if(!gxact) { + elog(WARNING, "Global transaction %s not found", gid); + } else { + SHMQueueDelete(&gxact->link); + TransactionIdStore(gxact->xid, &xid); + gxact = (GXACT *) hash_search(XactGlobal->gidHash, + (void *) gid, + HASH_REMOVE_SAVED, NULL); + if(!gxact) { + elog(WARNING, "Global transaction hash table corrupted"); + return 0; + } + } + + LWLockRelease(GlobalXactLock); + + return xid; +} + +void +ListPrepared() +{ + GXACT *gxact; + LWLockAcquire(GlobalXactLock, LW_EXCLUSIVE); + + elog(DEBUG1, "Scanning..."); + + gxact = (GXACT *) SHMQueueNext(&XactGlobal->gxactList, + &XactGlobal->gxactList, + offsetof(GXACT, link)); + while(gxact) + { + elog(DEBUG1, "Xid %d (%s) is prepared", gxact->xid, gxact->gid); + + gxact = (GXACT *) SHMQueueNext(&XactGlobal->gxactList, &gxact->link, + offsetof(GXACT, link)); + } + elog(DEBUG1, "Scanning ended."); + + LWLockRelease(GlobalXactLock); +} + +XLogRecPtr +GetSmallestRedoPtr() { + GXACT *gxact; + XLogRecPtr logRec = { 0, 0 }; + LWLockAcquire(GlobalXactLock, LW_EXCLUSIVE); + + elog(DEBUG1, "Scanning..."); + + gxact = (GXACT *) SHMQueueNext(&XactGlobal->gxactList, + &XactGlobal->gxactList, + offsetof(GXACT, link)); + while(gxact) + { + if(logRec.xrecoff == 0 || XLByteLT(gxact->logRec, logRec)) + { + logRec = gxact->logRec; + } + gxact = (GXACT *) SHMQueueNext(&XactGlobal->gxactList, &gxact->link, + offsetof(GXACT, link)); + } + elog(DEBUG1, "Scanning ended."); + + LWLockRelease(GlobalXactLock); + return logRec; +} + +#ifdef NOT_USED +/* + * Scans the commit log for all transactions with the specified status. + */ +void +TransactionIdScanForStatus(XidStatus testStatus) +{ + int pageno; + int byteno; + int bshift;; + int oldpageno; + char *byteptr = NULL; + XidStatus status; + TransactionId xid; + int i; + + elog(DEBUG1, "Scanning..."); + + LWLockAcquire(ClogCtl->locks->ControlLock, LW_EXCLUSIVE); + + i = 0; + for(xid= ShmemVariableCache->nextXid; i < 100; xid--) { + i++; + pageno = TransactionIdToPage(xid); + byteno = TransactionIdToByte(xid); + bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT; + if(oldpageno != pageno || byteptr == NULL) + byteptr = SimpleLruReadPage(ClogCtl, pageno, xid, false); + + + + status = (*(byteptr+byteno) >> bshift) & CLOG_XACT_BITMASK; + switch(status) { + case TRANSACTION_STATUS_IN_PROGRESS: + break; + + case TRANSACTION_STATUS_COMMITTED: + elog(DEBUG1, "Xid %d committed", xid); + break; + + case TRANSACTION_STATUS_ABORTED: + elog(DEBUG1, "Xid %d aborted", xid); + break; + + case TRANSACTION_STATUS_PREPARED: + elog(DEBUG1, "Xid %d is prepared", xid); + break; + } + + oldpageno = pageno; + } + + LWLockRelease(ClogCtl->locks->ControlLock); + elog(DEBUG1, "Scanning ended."); + + return status; +} + +#endif + /* * Initialization of shared memory for CLOG */ @@ -148,7 +359,13 @@ int CLOGShmemSize(void) { - return SimpleLruShmemSize(); + int size = 0; + + size += SimpleLruShmemSize(); + size += sizeof(XACTGLOBAL); + size += hash_estimate_size(100, sizeof(GXACT)); + + return size; } void @@ -156,6 +373,8 @@ { SimpleLruInit(ClogCtl, "CLOG Ctl", "pg_clog"); ClogCtl->PagePrecedes = CLOGPagePrecedes; + + InitPreparedTransactions(); } /* Index: access/transam/rmgr.c =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/backend/access/transam/rmgr.c,v retrieving revision 1.11 diff -u -r1.11 rmgr.c --- access/transam/rmgr.c 4 Aug 2003 00:43:15 -0000 1.11 +++ access/transam/rmgr.c 10 Oct 2003 07:32:48 -0000 @@ -16,6 +16,7 @@ #include "access/xact.h" #include "access/xlog.h" #include "storage/smgr.h" +#include "storage/lockrmgr.h" #include "commands/sequence.h" @@ -24,7 +25,7 @@ {"Transaction", xact_redo, xact_undo, xact_desc, NULL, NULL}, {"Storage", smgr_redo, smgr_undo, smgr_desc, NULL, NULL}, {"CLOG", clog_redo, clog_undo, clog_desc, NULL, NULL}, - {"Reserved 4", NULL, NULL, NULL, NULL, NULL}, + {"Lock", lock_redo, lock_undo, lock_desc, NULL, lock_cleanup}, {"Reserved 5", NULL, NULL, NULL, NULL, NULL}, {"Reserved 6", NULL, NULL, NULL, NULL, NULL}, {"Reserved 7", NULL, NULL, NULL, NULL, NULL}, Index: access/transam/slru.c =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/backend/access/transam/slru.c,v retrieving revision 1.7 diff -u -r1.7 slru.c --- access/transam/slru.c 25 Sep 2003 06:57:57 -0000 1.7 +++ access/transam/slru.c 10 Oct 2003 07:32:49 -0000 @@ -737,6 +737,18 @@ } /* + * + */ +int +SimpleLruGetLatestPage(SlruCtl ctl, int pageno) +{ + SlruShared shared = (SlruShared) ctl->shared; + + return shared->latest_page_number; +} + + +/* * This is called during checkpoint and postmaster/standalone-backend shutdown */ void Index: access/transam/transam.c =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/backend/access/transam/transam.c,v retrieving revision 1.55 diff -u -r1.55 transam.c --- access/transam/transam.c 4 Aug 2003 02:39:57 -0000 1.55 +++ access/transam/transam.c 10 Oct 2003 07:32:49 -0000 @@ -94,7 +94,8 @@ /* * DO NOT cache status for unfinished transactions! */ - if (xidstatus != TRANSACTION_STATUS_IN_PROGRESS) + if (xidstatus != TRANSACTION_STATUS_IN_PROGRESS && + xidstatus != TRANSACTION_STATUS_PREPARED) { TransactionIdStore(transactionId, &cachedTestXid); cachedTestXidStatus = xidstatus; @@ -181,6 +182,25 @@ } /* + * TransactionIdIsPrepared + * True iff transaction associated with the identifier is in prepared state. + * + * Note: + * Assumes transaction identifier is valid. + */ +bool /* true if given transaction is in prepared state */ +TransactionIdIsPrepared(TransactionId transactionId) +{ + if (AMI_OVERRIDE) + { + Assert(transactionId == BootstrapTransactionId); + return false; + } + + return TransactionLogTest(transactionId, TRANSACTION_STATUS_PREPARED); +} + +/* * TransactionIdDidAbort * True iff transaction associated with the identifier did abort. * @@ -234,9 +254,42 @@ * Assumes transaction identifier is valid. */ void -TransactionIdCommit(TransactionId transactionId) +TransactionIdCommit(TransactionId xid) +{ + TransactionLogUpdate(xid, TRANSACTION_STATUS_COMMITTED); +} + +/* + * TransactionIdCommitPrepared + * Commits the transaction associated with the globalidentifier. + * + * Note: + * Assumes transaction identifier is valid. + */ +TransactionId +TransactionIdCommitPrepared(char *gid) +{ + TransactionId transactionId; + + transactionId = MarkNoLongerPrepared(gid); + if(transactionId != 0) + TransactionLogUpdate(transactionId, TRANSACTION_STATUS_COMMITTED); + + return transactionId; +} + +/* + * TransactionIdPrepare + * Prepares the transaction associated with the identifier. + * + * Note: + * Assumes transaction identifier is valid. + */ +void +TransactionIdPrepare(TransactionId transactionId, char *gid, XLogRecPtr logRec) { - TransactionLogUpdate(transactionId, TRANSACTION_STATUS_COMMITTED); + TransactionLogUpdate(transactionId, TRANSACTION_STATUS_PREPARED); + MarkAsPrepared(transactionId, gid, logRec); } /* Index: access/transam/xact.c =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/backend/access/transam/xact.c,v retrieving revision 1.155 diff -u -r1.155 xact.c --- access/transam/xact.c 28 Sep 2003 23:26:20 -0000 1.155 +++ access/transam/xact.c 10 Oct 2003 07:32:50 -0000 @@ -188,6 +188,7 @@ static void CommitTransaction(void); static void RecordTransactionAbort(void); static void StartTransaction(void); +static void RecordTransactionPrepare(char *gid); /* * global variables holding the current transaction state. @@ -198,8 +199,9 @@ 0, /* scan command id */ 0x0, /* start time */ TRANS_DEFAULT, /* transaction state */ - TBLOCK_DEFAULT /* transaction block state from the client + TBLOCK_DEFAULT, /* transaction block state from the client * perspective */ + 0 }; TransactionState CurrentTransactionState = &CurrentTransactionStateData; @@ -290,6 +292,8 @@ return true; case TRANS_COMMIT: return true; + case TRANS_PREPARE: + return true; case TRANS_ABORT: return true; } @@ -490,6 +494,175 @@ * ---------------------------------------------------------------- */ + +/* + * RecordTransactionPrepare + */ +void +RecordTransactionPrepare(char *gid) +{ + /* + * If we made neither any XLOG entries nor any temp-rel updates, we + * can omit recording the transaction prepare at all. + */ + if (MyXactMadeXLogEntry || MyXactMadeTempRelUpdate) + { + TransactionId xid = GetCurrentTransactionId(); + XLogRecPtr recptr; + + /* Tell bufmgr and smgr to prepare for commit */ + BufmgrCommit(); + + START_CRIT_SECTION(); + + /* + * We only need to log the commit in xlog if the transaction made + * any transaction-controlled XLOG entries. (Otherwise, its XID + * appears nowhere in permanent storage, so no one else will ever + * care if it committed.) + */ + if (MyLastRecPtr.xrecoff != 0) + { + /* Need to emit a commit record */ + XLogRecData rdata; + xl_xact_prepare xlrec; + + xlrec.xtime = time(NULL); + strncpy(xlrec.gid, gid, GIDSIZE); + xlrec.logRec = MyProc->logRec; + elog(DEBUG1, "writing xl_xact_prepare, logRec = %x %x", xlrec.logRec.xlogid, xlrec.logRec.xrecoff); + rdata.buffer = InvalidBuffer; + rdata.data = (char *) (&xlrec); + rdata.len = SizeOfXactPrepare; + rdata.next = NULL; + + /* + * XXX SHOULD SAVE ARRAY OF RELFILENODE-s TO DROP + */ + recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE, &rdata); + } + else + { + /* Just flush through last record written by me */ + recptr = ProcLastRecEnd; + } + + /* + * We must flush our XLOG entries to disk if we made any XLOG + * entries, whether in or out of transaction control. For + * example, if we reported a nextval() result to the client, this + * ensures that any XLOG record generated by nextval will hit the + * disk before we report the transaction committed. + */ + if (MyXactMadeXLogEntry) + { + /* + * Sleep before flush! So we can flush more than one commit + * records per single fsync. (The idea is some other backend + * may do the XLogFlush while we're sleeping. This needs work + * still, because on most Unixen, the minimum select() delay + * is 10msec or more, which is way too long.) + * + * We do not sleep if enableFsync is not turned on, nor if there + * are fewer than CommitSiblings other backends with active + * transactions. + */ + if (CommitDelay > 0 && enableFsync && + CountActiveBackends() >= CommitSiblings) + { + struct timeval delay; + + delay.tv_sec = 0; + delay.tv_usec = CommitDelay; + (void) select(0, NULL, NULL, NULL, &delay); + } + + XLogFlush(recptr); + } + + /* + * We must mark the transaction prepared in clog if its XID + * appears either in permanent rels or in local temporary rels. We + * test this by seeing if we made transaction-controlled entries + * *OR* local-rel tuple updates. Note that if we made only the + * latter, we have not emitted an XLOG record for our commit, and + * so in the event of a crash the clog update might be lost. This + * is okay because no one else will ever care whether we + * committed. + */ + if (MyLastRecPtr.xrecoff != 0 || MyXactMadeTempRelUpdate) + TransactionIdPrepare(xid, gid, MyProc->logRec); + + END_CRIT_SECTION(); + } + + /* Break the chain of back-links in the XLOG records I output */ + MyLastRecPtr.xrecoff = 0; + MyXactMadeXLogEntry = false; + MyXactMadeTempRelUpdate = false; + + /* Show myself as out of the transaction in PGPROC array */ + MyProc->logRec.xrecoff = 0; +} + + + +/* + * RecordTransactionCommitPrepared + */ +TransactionId +RecordTransactionCommitPrepared(char *gid) +{ + TransactionId xid; + XLogRecPtr recptr; + + elog(DEBUG1, "Committing prepared transaction gid %s\n", gid); + + START_CRIT_SECTION(); + + /* Need to emit a commit record */ + XLogRecData rdata; + xl_xact_commit_prepared xlrec; + + xlrec.xtime = time(NULL); + strncpy(xlrec.gid, gid, GIDSIZE); + rdata.buffer = InvalidBuffer; + rdata.data = (char *) (&xlrec); + rdata.len = SizeOfXactCommitPrepared; + rdata.next = NULL; + + recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, &rdata); + + /* + * Sleep before flush! So we can flush more than one commit + * records per single fsync. (The idea is some other backend + * may do the XLogFlush while we're sleeping. This needs work + * still, because on most Unixen, the minimum select() delay + * is 10msec or more, which is way too long.) + * + * We do not sleep if enableFsync is not turned on, nor if there + * are fewer than CommitSiblings other backends with active + * transactions. + */ + if (CommitDelay > 0 && enableFsync && + CountActiveBackends() >= CommitSiblings) + { + struct timeval delay; + + delay.tv_sec = 0; + delay.tv_usec = CommitDelay; + (void) select(0, NULL, NULL, NULL, &delay); + } + + XLogFlush(recptr); + + xid = TransactionIdCommitPrepared(gid); + + END_CRIT_SECTION(); + return xid; +} + + /* * RecordTransactionCommit */ @@ -1001,6 +1174,143 @@ } /* + * CommitPreparedTransaction + */ +void +CommitPreparedTransaction(char *gid) +{ + TransactionId xid; + + xid = RecordTransactionCommitPrepared(gid); + LockReleasePersistentAll(DEFAULT_LOCKMETHOD, xid); +} + +/* + * PrepareTransaction + */ +static void +PrepareTransaction(void) +{ + TransactionState s = CurrentTransactionState; + TransactionId xid = GetCurrentTransactionId(); + + /* + * check the current transaction state + */ + if (s->state != TRANS_INPROGRESS) + elog(WARNING, "PrepareTransaction and not in in-progress state"); + + /* + * Tell the trigger manager that this transaction is about to be + * committed. He'll invoke all trigger deferred until XACT before we + * really start on committing the transaction. + */ + DeferredTriggerEndXact(); + + /* + * Similarly, let ON COMMIT management do its thing before we start to + * commit. + */ + PreCommit_on_commit_actions(); + + /* Prevent cancel/die interrupt while cleaning up */ + HOLD_INTERRUPTS(); + + /* + * set the current transaction state information appropriately during + * the processing + */ + s->state = TRANS_PREPARE; + + /* + * Do pre-commit processing (most of this stuff requires database + * access, and in fact could still cause an error...) + */ + + AtCommit_Portals(); + + /* handle commit for large objects [ PA, 7/17/98 ] */ + /* XXX probably this does not belong here */ + lo_commit(true); + + /* NOTIFY commit must come before lower-level cleanup */ + AtCommit_Notify(); + + /* Update the flat password file if we changed pg_shadow or pg_group */ + AtEOXact_UpdatePasswordFile(true); + + LockPersistAll(DEFAULT_LOCKMETHOD, MyProc, xid); + + /* + * Here is where we really truly prepare. + */ + RecordTransactionPrepare(s->gid); + + /* + * Let others know about no transaction in progress by me. Note that + * this must be done _before_ releasing locks we hold and _after_ + * RecordTransactionCommit. + * + * LWLockAcquire(SInvalLock) is required: UPDATE with xid 0 is blocked by + * xid 1' UPDATE, xid 1 is doing commit while xid 2 gets snapshot - if + * xid 2' GetSnapshotData sees xid 1 as running then it must see xid 0 + * as running as well or it will see two tuple versions - one deleted + * by xid 1 and one inserted by xid 0. See notes in GetSnapshotData. + */ + if (MyProc != (PGPROC *) NULL) + { + /* Lock SInvalLock because that's what GetSnapshotData uses. */ + LWLockAcquire(SInvalLock, LW_EXCLUSIVE); + MyProc->xid = InvalidTransactionId; + MyProc->xmin = InvalidTransactionId; + LWLockRelease(SInvalLock); + } + + /* + * This is all post-commit cleanup. Note that if an error is raised + * here, it's too late to abort the transaction. This should be just + * noncritical resource releasing. + * + * The ordering of operations is not entirely random. The idea is: + * release resources visible to other backends (eg, files, buffer + * pins); then release locks; then release backend-local resources. We + * want to release locks at the point where any backend waiting for us + * will see our transaction as being fully cleaned up. + */ + + smgrDoPendingDeletes(true); + AtCommit_Cache(); + AtEOXact_Buffers(true); + /* smgrcommit already done */ + + /* AtCommit_Locks(); */ + + CallEOXactCallbacks(true); + AtEOXact_GUC(true); + AtEOXact_SPI(); + AtEOXact_gist(); + AtEOXact_hash(); + AtEOXact_nbtree(); + AtEOXact_rtree(); + AtEOXact_on_commit_actions(true); + AtEOXact_Namespace(true); + AtEOXact_CatCache(true); + AtEOXact_Files(); + pgstat_count_xact_commit(); + AtCommit_Memory(); + + /* + * done with commit processing, set current transaction state back to + * default + */ + s->state = TRANS_DEFAULT; + + RESUME_INTERRUPTS(); +} + + + +/* * AbortTransaction */ static void @@ -1187,6 +1497,20 @@ break; /* + * As with BEGIN and END, we should never experience this if we do it + * means the PREPARE state was not changed in the previous + * CommitTransactionCommand(). If we get it, we print a + * warning, commit the transaction, start a new transaction + * and change to the default state. + */ + case TBLOCK_PREPARE: + elog(WARNING, "StartTransactionCommand: unexpected TBLOCK_PREAPARE"); + s->blockState = TBLOCK_DEFAULT; + AbortTransaction(); + StartTransaction(); + break; + + /* * Here we are in the middle of a transaction block but one of * the commands caused an abort so we do nothing but remain in * the abort state. Eventually we will get to the "END @@ -1262,6 +1586,17 @@ s->blockState = TBLOCK_DEFAULT; break; + + /* + * This is the case when we just got the "PREPCOMMIT" + * statement, so we commit the transaction and go back to the + * default state. + */ + case TBLOCK_PREPARE: + PrepareTransaction(); + s->blockState = TBLOCK_DEFAULT; + break; + /* * Here we are in the middle of a transaction block but one of * the commands caused an abort so we do nothing but remain in @@ -1331,6 +1666,17 @@ * to end the transaction block so we abort the transaction * and put us back into the default state. */ + case TBLOCK_PREPARE: + s->blockState = TBLOCK_DEFAULT; + AbortTransaction(); + CleanupTransaction(); + break; + + /* + * Here, the system was fouled up just after the user wanted + * to end the transaction block so we abort the transaction + * and put us back into the default state. + */ case TBLOCK_END: s->blockState = TBLOCK_DEFAULT; AbortTransaction(); @@ -1586,6 +1932,59 @@ s->blockState = TBLOCK_ENDABORT; } + +/* + * PrepareTransactionBlock + */ +void +PrepareTransactionBlock(char *gid) +{ + TransactionState s = CurrentTransactionState; + + /* + * check the current transaction state + */ + if (s->blockState == TBLOCK_INPROGRESS) + { + /* + * here we are in a transaction block which should commit when we + * get to the upcoming CommitTransactionCommand() so we set the + * state to "PREPARE". CommitTransactionCommand() will recognize this + * and log the transaction as prepared and return us to the default state + */ + s->blockState = TBLOCK_PREPARE; + s->gid = gid; + return; + } + + if (s->blockState == TBLOCK_ABORT) + { + /* + * here, we are in a transaction block which aborted and since the + * AbortTransaction() was already done, we do whatever is needed + * and change to the special "END ABORT" state. The upcoming + * CommitTransactionCommand() will recognise this and then put us + * back in the default state. + */ + s->blockState = TBLOCK_ENDABORT; + return; + } + + /* + * here, the user issued PREPCOMMIT when not inside a transaction. Issue a + * WARNING and go to abort state. The upcoming call to + * CommitTransactionCommand() will then put us back into the default + * state. + */ + ereport(WARNING, + (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION), + errmsg("there is no transaction in progress"))); + AbortTransaction(); + s->blockState = TBLOCK_ENDABORT; +} + + + /* * AbortTransactionBlock */ @@ -1692,6 +2091,7 @@ case TRANS_START: case TRANS_INPROGRESS: case TRANS_COMMIT: + case TRANS_PREPARE: /* In a transaction, so clean up */ AbortTransaction(); CleanupTransaction(); @@ -1739,6 +2139,7 @@ return 'I'; /* idle --- not in transaction */ case TBLOCK_BEGIN: case TBLOCK_INPROGRESS: + case TBLOCK_PREPARE: case TBLOCK_END: return 'T'; /* in transaction */ case TBLOCK_ABORT: @@ -1752,7 +2153,6 @@ return 0; /* keep compiler quiet */ } - /* * XLOG support routines */ @@ -1766,6 +2166,19 @@ { TransactionIdCommit(record->xl_xid); /* SHOULD REMOVE FILES OF ALL DROPPED RELATIONS */ + } + else if (info == XLOG_XACT_PREPARE) + { + xl_xact_prepare *recorddata = (xl_xact_prepare *) XLogRecGetData(record); + + TransactionIdPrepare(record->xl_xid, recorddata->gid, recorddata->logRec); + /* SHOULD REMOVE FILES OF ALL FAILED-TO-BE-CREATED RELATIONS */ + } + else if (info == XLOG_XACT_COMMIT_PREPARED) + { + xl_xact_commit_prepared *recorddata = (xl_xact_prepare *)XLogRecGetData(record); + TransactionIdCommitPrepared(recorddata->gid); + /* SHOULD REMOVE FILES OF ALL FAILED-TO-BE-CREATED RELATIONS */ } else if (info == XLOG_XACT_ABORT) { Index: access/transam/xlog.c =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/backend/access/transam/xlog.c,v retrieving revision 1.125 diff -u -r1.125 xlog.c --- access/transam/xlog.c 27 Sep 2003 18:16:35 -0000 1.125 +++ access/transam/xlog.c 10 Oct 2003 07:32:52 -0000 @@ -2690,7 +2690,7 @@ XLByteLT(checkPoint.redo, RecPtr)) { if (wasShutdown) - ereport(PANIC, + ereport(WARNING, (errmsg("invalid redo/undo record in shutdown checkpoint"))); InRecovery = true; } @@ -3238,6 +3238,15 @@ } /* + * Make sure we don't create a checkpoint past a prepared transaction + */ + recptr = GetSmallestRedoPtr(); + if(recptr.xrecoff != 0 && XLByteLT(recptr, checkPoint.redo)) { + checkPoint.redo = recptr; + } + + + /* * Get UNDO record ptr - this is oldest of PGPROC->logRec values. We * do this while holding insert lock to ensure that we won't miss any * about-to-commit transactions (UNDO must include all xacts that have @@ -3313,7 +3322,7 @@ * recptr = end of actual checkpoint record. */ if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr)) - ereport(PANIC, + ereport(WARNING, (errmsg("concurrent transaction log activity while database system is shutting down"))); /* Index: parser/gram.y =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/backend/parser/gram.y,v retrieving revision 2.436 diff -u -r2.436 gram.y --- parser/gram.y 2 Oct 2003 06:34:04 -0000 2.436 +++ parser/gram.y 10 Oct 2003 07:32:56 -0000 @@ -335,7 +335,7 @@ CACHE CALLED CASCADE CASE CAST CHAIN CHAR_P CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE - CLUSTER COALESCE COLLATE COLUMN COMMENT COMMIT + CLUSTER COALESCE COLLATE COLUMN COMMENT COMMIT COMMITPREPARED COMMITTED CONSTRAINT CONSTRAINTS CONVERSION_P CONVERT COPY CREATE CREATEDB CREATEUSER CROSS CURRENT_DATE CURRENT_TIME CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE @@ -364,7 +364,7 @@ KEY LANCOMPILER LANGUAGE LAST_P LEADING LEFT LEVEL LIKE LIMIT - LISTEN LOAD LOCAL LOCALTIME LOCALTIMESTAMP LOCATION + LISTEN LISTPREPARED LOAD LOCAL LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P MATCH MAXVALUE MINUTE_P MINVALUE MODE MONTH_P MOVE @@ -377,7 +377,7 @@ ORDER OUT_P OUTER_P OVERLAPS OVERLAY OWNER PARTIAL PASSWORD PATH_P PENDANT PLACING POSITION - PRECISION PRESERVE PREPARE PRIMARY + PRECISION PRESERVE PREPARE PREPCOMMIT PRIMARY PRIOR PRIVILEGES PROCEDURAL PROCEDURE READ REAL RECHECK REFERENCES REINDEX RELATIVE_P RENAME REPLACE @@ -3617,6 +3617,27 @@ TransactionStmt *n = makeNode(TransactionStmt); n->kind = TRANS_STMT_START; n->options = $3; + $$ = (Node *)n; + } + | PREPCOMMIT Sconst + { + TransactionStmt *n = makeNode(TransactionStmt); + n->kind = TRANS_STMT_PREPARE; + n->options = $2; + $$ = (Node *)n; + } + | COMMITPREPARED Sconst + { + TransactionStmt *n = makeNode(TransactionStmt); + n->kind = TRANS_STMT_COMMIT_PREPARED; + n->options = $2; + $$ = (Node *)n; + } + | LISTPREPARED + { + TransactionStmt *n = makeNode(TransactionStmt); + n->kind = TRANS_STMT_LIST_PREPARED; + n->options = NIL; $$ = (Node *)n; } | COMMIT opt_transaction Index: parser/keywords.c =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/backend/parser/keywords.c,v retrieving revision 1.141 diff -u -r1.141 keywords.c --- parser/keywords.c 4 Aug 2003 02:40:01 -0000 1.141 +++ parser/keywords.c 10 Oct 2003 07:32:56 -0000 @@ -79,6 +79,7 @@ {"column", COLUMN}, {"comment", COMMENT}, {"commit", COMMIT}, + {"commitprepared", COMMITPREPARED}, {"committed", COMMITTED}, {"constraint", CONSTRAINT}, {"constraints", CONSTRAINTS}, @@ -184,6 +185,7 @@ {"like", LIKE}, {"limit", LIMIT}, {"listen", LISTEN}, + {"listprepared", LISTPREPARED}, {"load", LOAD}, {"local", LOCAL}, {"localtime", LOCALTIME}, @@ -238,6 +240,7 @@ {"position", POSITION}, {"precision", PRECISION}, {"prepare", PREPARE}, + {"prepcommit", PREPCOMMIT}, {"preserve", PRESERVE}, {"primary", PRIMARY}, {"prior", PRIOR}, Index: storage/ipc/sinval.c =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/backend/storage/ipc/sinval.c,v retrieving revision 1.61 diff -u -r1.61 sinval.c --- storage/ipc/sinval.c 1 Oct 2003 21:30:52 -0000 1.61 +++ storage/ipc/sinval.c 10 Oct 2003 07:32:57 -0000 @@ -231,7 +231,11 @@ LWLockRelease(SInvalLock); - return result; + if(!result) { + return TransactionIdIsPrepared(xid); + } else { + return result; + } } /* Index: storage/lmgr/lock.c =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/backend/storage/lmgr/lock.c,v retrieving revision 1.127 diff -u -r1.127 lock.c --- storage/lmgr/lock.c 17 Aug 2003 22:41:12 -0000 1.127 +++ storage/lmgr/lock.c 10 Oct 2003 07:32:57 -0000 @@ -33,19 +33,29 @@ #include #include +#include "storage/lock.h" +#include "storage/lockrmgr.h" #include "access/xact.h" +#include "access/xlog.h" #include "miscadmin.h" #include "storage/proc.h" #include "utils/memutils.h" #include "utils/ps_status.h" +/* A dummy PGPROC structure used to hold proclock entries for + locks that belong to transactions that are in prepared state */ +static struct PGPROC *persistedLocksProc; + /* This configuration variable is used to set the lock table size */ int max_locks_per_xact; /* set by guc.c */ #define NLOCKENTS(maxBackends) (max_locks_per_xact * (maxBackends)) +static bool LockAcquireForProc(LOCKMETHOD lockmethod, LOCKTAG *locktag, + TransactionId xid, LOCKMODE lockmode, bool dontWait, PGPROC *proc); + static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, LOCK *lock, PROCLOCK *proclock); static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, @@ -175,6 +185,7 @@ { int i; int bit; + bool found = false; bit = 1; for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1) @@ -182,6 +193,15 @@ BITS_ON[i] = bit; BITS_OFF[i] = ~bit; } + + /* Initialize the dummy PGPROC structure for locks from prepared + transactions */ + persistedLocksProc = (PGPROC *) ShmemInitStruct("persistedLocksProc", sizeof(PGPROC), &found); + if (!found) { + MemSet(persistedLocksProc, 0, sizeof(PGPROC)); + persistedLocksProc->pid = 0; + SHMQueueInit(&persistedLocksProc->procHolders); + } } @@ -432,6 +452,13 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, TransactionId xid, LOCKMODE lockmode, bool dontWait) { + return LockAcquireForProc(lockmethod, locktag, xid, lockmode, dontWait, MyProc); +} + +bool +LockAcquireForProc(LOCKMETHOD lockmethod, LOCKTAG *locktag, + TransactionId xid, LOCKMODE lockmode, bool dontWait, PGPROC *proc) +{ PROCLOCK *proclock; PROCLOCKTAG proclocktag; HTAB *proclockTable; @@ -509,7 +536,7 @@ MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding, * needed */ proclocktag.lock = MAKE_OFFSET(lock); - proclocktag.proc = MAKE_OFFSET(MyProc); + proclocktag.proc = MAKE_OFFSET(proc); TransactionIdStore(xid, &proclocktag.xid); /* @@ -537,7 +564,7 @@ MemSet((char *) proclock->holding, 0, sizeof(int) * MAX_LOCKMODES); /* Add proclock to appropriate lists */ SHMQueueInsertBefore(&lock->lockHolders, &proclock->lockLink); - SHMQueueInsertBefore(&MyProc->procHolders, &proclock->procLink); + SHMQueueInsertBefore(&proc->procHolders, &proclock->procLink); PROCLOCK_PRINT("LockAcquire: new", proclock); } else @@ -604,7 +631,7 @@ * If this process (under any XID) is a proclock of the lock, also * grant myself another one without blocking. */ - LockCountMyLocks(proclock->tag.lock, MyProc, myHolding); + LockCountMyLocks(proclock->tag.lock, proc, myHolding); if (myHolding[lockmode] > 0) { GrantLock(lock, proclock, lockmode); @@ -623,7 +650,7 @@ else status = LockCheckConflicts(lockMethodTable, lockmode, lock, proclock, - MyProc, myHolding); + proc, myHolding); if (status == STATUS_OK) { @@ -676,7 +703,7 @@ if (myHolding[i] > 0) heldLocks |= tmpMask; } - MyProc->heldLocks = heldLocks; + proc->heldLocks = heldLocks; } /* @@ -1162,6 +1189,16 @@ } /* + * LockReleaseAll -- Releases all persisted locks that belong to + * the given transaction + */ +bool +LockReleasePersistentAll(LOCKMETHOD lockmethod, TransactionId xid) +{ + return LockReleaseAll(lockmethod, persistedLocksProc, false, xid); +} + +/* * LockReleaseAll -- Release all locks in a process's lock list. * * Well, not really *all* locks. @@ -1338,6 +1375,163 @@ return TRUE; } + +/* + * LockPersistAll -- Persists all locks in a process's lock list. + * + * This is called when a transaction is prepared for two-phase commit. + * + * All proclock-structures that belong to the given process are released + * and new corresponding structures are created for persistedLocksProc. + * An xlog record is also emitted for each lock. + */ +bool +LockPersistAll(LOCKMETHOD lockmethod, PGPROC *proc, TransactionId xid) +{ + SHM_QUEUE *procHolders = &(proc->procHolders); + PROCLOCK *oldproclock; + PROCLOCK *newproclock; + PROCLOCKTAG proclocktag; + PROCLOCK *nextHolder; + LWLockId masterLock; + LOCKMETHODTABLE *lockMethodTable; + int numLockModes; + LOCK *lock; + bool found; + +#ifdef LOCK_DEBUG + if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) + elog(LOG, "LockPersistAll: lockmethod=%d, pid=%d", + lockmethod, proc->pid); +#endif + + Assert(lockmethod < NumLockMethods); + lockMethodTable = LockMethodTable[lockmethod]; + if (!lockMethodTable) + { + elog(WARNING, "bad lock method: %d", lockmethod); + return FALSE; + } + + numLockModes = lockMethodTable->numLockModes; + masterLock = lockMethodTable->masterLock; + + LWLockAcquire(masterLock, LW_EXCLUSIVE); + + oldproclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders, + offsetof(PROCLOCK, procLink)); + + while (oldproclock) + { + /* Get link first, since we may unlink/delete this proclock */ + nextHolder = (PROCLOCK *) SHMQueueNext(procHolders, &oldproclock->procLink, + offsetof(PROCLOCK, procLink)); + + Assert(oldproclock->tag.proc == MAKE_OFFSET(proc)); + + lock = (LOCK *) MAKE_PTR(oldproclock->tag.lock); + + /* Ignore items that are not of the lockmethod to be removed */ + if (LOCK_LOCKMETHOD(*lock) != lockmethod) + goto next_item; + + /* Ignore items that are of the wrong xid */ + if (!TransactionIdEquals(xid, oldproclock->tag.xid)) + goto next_item; + + PROCLOCK_PRINT("LockPersistAll", oldproclock); + LOCK_PRINT("LockPersistAll", lock, 0); + Assert(lock->nRequested >= 0); + Assert(lock->nGranted >= 0); + Assert(lock->nGranted <= lock->nRequested); + Assert(oldproclock->nHolding >= 0); + Assert(oldproclock->nHolding <= lock->nRequested); + + /* + * First, we need to create a new proclock structure that is identical to + * the old one, except that the new one points to persistedLocksProc. + * Then, we insert the new proclock in the LOCK.lockHolders + * list, right next to the old one, and then the old proclock is removed + * from the linked lists and finally the from hashtable. + */ + + /* + * Create the hash key for the proclock table. + */ + MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding, + * needed */ + proclocktag.lock = MAKE_OFFSET(lock); + proclocktag.proc = MAKE_OFFSET(persistedLocksProc); + TransactionIdStore(xid, &proclocktag.xid); + + newproclock = (PROCLOCK *) hash_search(lockMethodTable->proclockHash, + (void *) &proclocktag, + HASH_ENTER, &found); + if (!newproclock) + { + LWLockRelease(masterLock); + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + return FALSE; + } + + /* + * If new, initialize the new entry + */ + if (!found) + { + memcpy(newproclock, oldproclock, sizeof(PROCLOCK)); + newproclock->tag.proc = MAKE_OFFSET(persistedLocksProc); + /* Add proclock to appropriate lists */ + SHMQueueInsertBefore(&persistedLocksProc->procHolders, &newproclock->procLink); + SHMQueueInsertBefore(&oldproclock->lockLink, &newproclock->lockLink); + PROCLOCK_PRINT("LockPersistAll: new", newproclock); + } + + /* Make a xlog record */ + XLogRecData rdata; + xl_lock xldata; + memcpy(&xldata.locktag, &lock->tag, sizeof(LOCKTAG)); + memcpy(&xldata.holding[0], &newproclock->holding[0], sizeof(int)*MAX_LOCKMODES); + + rdata.buffer = InvalidBuffer; + rdata.data = (char *) (&xldata); + rdata.len = sizeof(xl_lock); + rdata.next = NULL; + XLogInsert(RM_LOCK_ID, 0, &rdata); + + /* remove the old proclock entry from the hashtable and linked lists */ + SHMQueueDelete(&oldproclock->lockLink); + SHMQueueDelete(&oldproclock->procLink); + oldproclock = (PROCLOCK *) hash_search(lockMethodTable->proclockHash, + (void *) oldproclock, + HASH_REMOVE, + NULL); + if (!oldproclock) + { + LWLockRelease(masterLock); + elog(WARNING, "proclock table corrupted"); + return FALSE; + } + + +next_item: + oldproclock = nextHolder; + } + + LWLockRelease(masterLock); + +#ifdef LOCK_DEBUG + if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) + elog(LOG, "LockPersistAll done"); +#endif + + return TRUE; +} + + + int LockShmemSize(int maxBackends) { @@ -1360,6 +1554,9 @@ */ size += size / 10; + /* Count in the dummy PGPROC structure */ + size += MAXALIGN(sizeof(PGPROC)); + return size; } @@ -1440,16 +1637,14 @@ * Must have already acquired the masterLock. */ void -DumpLocks(void) +DumpLocks(PGPROC *proc) { - PGPROC *proc; SHM_QUEUE *procHolders; PROCLOCK *proclock; LOCK *lock; int lockmethod = DEFAULT_LOCKMETHOD; LOCKMETHODTABLE *lockMethodTable; - proc = MyProc; if (proc == NULL) return; @@ -1524,3 +1719,210 @@ } #endif /* LOCK_DEBUG */ + + + +/* + * LOCK resource manager's routines + */ + +/* + * Re-acquire all locks that belong to transactions + * that were prepared. Some of these transactions + * might have committed/aborted later, but we + * release locks belonging to those transactions + * later in lock_cleanup, when we know the final + * state of all transactions. + * + * Because this function is run at db startup, + * re-acquiring the locks should never conflict + * with running transactions because there is none. + * All locks are acquired for the same dummy PGPROC + * structure, and therefore they should also + * never conflict with each other. + */ +void +lock_redo(XLogRecPtr lsn, XLogRecord *record) +{ + xl_lock *rec = (xl_lock *)XLogRecGetData(record); + bool success; + int i; + + for(i = 0; i < MAX_LOCKMODES; i++) { + if(rec->holding[i] > 0) { + success = LockAcquireForProc(rec->locktag.lockmethod, &rec->locktag, + record->xl_xid, i, true, persistedLocksProc); + if(success) { + elog(DEBUG1, "Lock re-acquired"); + } else { + elog(WARNING, "Couldn't re-acquire lock"); + } + } + } +} + +void +lock_undo(XLogRecPtr lsn, XLogRecord *record) +{ +} + +void +lock_desc(char *buf, uint8 xl_info, char *rec) +{ + strcat(buf, "lock record for prepared xact"); +} + +/* + * This releases any locks that were acquired on behalf transactions + * that later aborted or committed. + */ +void +lock_cleanup() { + SHM_QUEUE *procHolders = &(persistedLocksProc->procHolders); + PROCLOCK *proclock; + PROCLOCK *nextHolder; + LWLockId masterLock; + LOCKMETHODTABLE *lockMethodTable; + int i, + numLockModes; + LOCK *lock; + + lockMethodTable = LockMethodTable[DEFAULT_LOCKMETHOD]; + if (!lockMethodTable) + { + elog(WARNING, "bad lock method: %d", DEFAULT_LOCKMETHOD); + return; + } + + numLockModes = lockMethodTable->numLockModes; + masterLock = lockMethodTable->masterLock; + + LWLockAcquire(masterLock, LW_EXCLUSIVE); + + proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders, + offsetof(PROCLOCK, procLink)); + + while (proclock) + { + bool wakeupNeeded = false; + + /* Get link first, since we may unlink/delete this proclock */ + nextHolder = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink, + offsetof(PROCLOCK, procLink)); + + lock = (LOCK *) MAKE_PTR(proclock->tag.lock); + + /* We only remove items of transactions that are not in prepared state */ + if(TransactionIdIsPrepared(proclock->tag.xid)) + goto next_item; + + PROCLOCK_PRINT("lock_cleanup", proclock); + LOCK_PRINT("lock_cleanup", lock, 0); + Assert(lock->nRequested >= 0); + Assert(lock->nGranted >= 0); + Assert(lock->nGranted <= lock->nRequested); + Assert(proclock->nHolding >= 0); + Assert(proclock->nHolding <= lock->nRequested); + + /* + * fix the general lock stats + */ + if (lock->nRequested != proclock->nHolding) + { + for (i = 1; i <= numLockModes; i++) + { + Assert(proclock->holding[i] >= 0); + if (proclock->holding[i] > 0) + { + lock->requested[i] -= proclock->holding[i]; + lock->granted[i] -= proclock->holding[i]; + Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0); + if (lock->granted[i] == 0) + lock->grantMask &= BITS_OFF[i]; + + /* + * Read comments in LockRelease + */ + if (!wakeupNeeded && + lockMethodTable->conflictTab[i] & lock->waitMask) + wakeupNeeded = true; + } + } + lock->nRequested -= proclock->nHolding; + lock->nGranted -= proclock->nHolding; + Assert((lock->nRequested >= 0) && (lock->nGranted >= 0)); + Assert(lock->nGranted <= lock->nRequested); + } + else + { + /* + * This proclock accounts for all the requested locks on the + * object, so we can be lazy and just zero things out. + */ + lock->nRequested = 0; + lock->nGranted = 0; + /* Fix the lock status, just for next LOCK_PRINT message. */ + for (i = 1; i <= numLockModes; i++) + { + Assert(lock->requested[i] == lock->granted[i]); + lock->requested[i] = lock->granted[i] = 0; + } + } + LOCK_PRINT("lock_cleanup: updated", lock, 0); + + PROCLOCK_PRINT("lock_cleanup: deleting", proclock); + + /* + * Remove the proclock entry from the linked lists + */ + SHMQueueDelete(&proclock->lockLink); + SHMQueueDelete(&proclock->procLink); + + /* + * remove the proclock entry from the hashtable + */ + proclock = (PROCLOCK *) hash_search(lockMethodTable->proclockHash, + (void *) proclock, + HASH_REMOVE, + NULL); + if (!proclock) + { + LWLockRelease(masterLock); + elog(WARNING, "proclock table corrupted"); + return; + } + + if (lock->nRequested == 0) + { + /* + * We've just released the last lock, so garbage-collect the + * lock object. + */ + LOCK_PRINT("lock_cleanup: deleting", lock, 0); + Assert(lockMethodTable->lockHash->hash == tag_hash); + lock = (LOCK *) hash_search(lockMethodTable->lockHash, + (void *) &(lock->tag), + HASH_REMOVE, NULL); + if (!lock) + { + LWLockRelease(masterLock); + elog(WARNING, "cannot remove lock from HTAB"); + return; + } + } + else if (wakeupNeeded) + ProcLockWakeup(lockMethodTable, lock); + +next_item: + proclock = nextHolder; + } + + LWLockRelease(masterLock); + +#ifdef LOCK_DEBUG + if (Trace_locks) + elog(LOG, "lock_cleanup done"); +#endif + + return; +} Index: tcop/utility.c =================================================================== RCS file: /projects/cvsroot/pgsql-server/src/backend/tcop/utility.c,v retrieving revision 1.208 diff -u -r1.208 utility.c --- tcop/utility.c 2 Oct 2003 06:34:04 -0000 1.208 +++ tcop/utility.c 10 Oct 2003 07:32:58 -0000 @@ -337,6 +337,18 @@ EndTransactionBlock(); break; + case TRANS_STMT_COMMIT_PREPARED: + CommitPreparedTransaction(stmt->options); + break; + + case TRANS_STMT_LIST_PREPARED: + ListPrepared(); + break; + + case TRANS_STMT_PREPARE: + PrepareTransactionBlock(stmt->options); + break; + case TRANS_STMT_ROLLBACK: UserAbortTransactionBlock(); break; @@ -1199,8 +1211,20 @@ tag = "START TRANSACTION"; break; + case TRANS_STMT_PREPARE: + tag = "PREPCOMMIT"; + break; + case TRANS_STMT_COMMIT: tag = "COMMIT"; + break; + + case TRANS_STMT_COMMIT_PREPARED: + tag = "COMMITPREPARED"; + break; + + case TRANS_STMT_LIST_PREPARED: + tag = "LISTPREPARED"; break; case TRANS_STMT_ROLLBACK: