diff -Ncr --exclude-from=diff-ignore 00orig/doc/src/sgml/ref/allfiles.sgml 08twophase/doc/src/sgml/ref/allfiles.sgml *** 00orig/doc/src/sgml/ref/allfiles.sgml 2004-08-23 11:14:41.000000000 -0400 --- 08twophase/doc/src/sgml/ref/allfiles.sgml 2005-06-08 19:23:59.000000000 -0400 *************** *** 30,35 **** --- 30,36 ---- + *************** *** 88,98 **** --- 89,101 ---- + + diff -Ncr --exclude-from=diff-ignore 00orig/doc/src/sgml/ref/commit_prepared.sgml 08twophase/doc/src/sgml/ref/commit_prepared.sgml *** 00orig/doc/src/sgml/ref/commit_prepared.sgml 1969-12-31 21:00:00.000000000 -0300 --- 08twophase/doc/src/sgml/ref/commit_prepared.sgml 2005-06-08 19:23:59.000000000 -0400 *************** *** 0 **** --- 1,105 ---- + + + + + COMMIT PREPARED + SQL - Language Statements + + + + COMMIT PREPARED + commit a transaction that was earlier prepared for two-phase commit + + + + COMMIT PREPARED + + + + + COMMIT PREPARED 'global transaction id' + + + + + Description + + + COMMIT PREPARED commits a transaction that is in + prepared state. + + + + You can check all current transactions from the pg_prepared_xacts + system view. + + + + + Parameters + + + + global_transaction_id + + + The global transaction identifier of the transaction that is to be + rolled back. + + + + + + + + Notes + + + This command works outside transaction control. The prepared + transaction is committed immediately. + + + + + Examples + + Commits the transaction identified by the global transaction + identifier 'foobar': + + + COMMIT PREPARED 'foobar' + + + + + + + See Also + + + + + + + + + + diff -Ncr --exclude-from=diff-ignore 00orig/doc/src/sgml/ref/prepare_transaction.sgml 08twophase/doc/src/sgml/ref/prepare_transaction.sgml *** 00orig/doc/src/sgml/ref/prepare_transaction.sgml 1969-12-31 21:00:00.000000000 -0300 --- 08twophase/doc/src/sgml/ref/prepare_transaction.sgml 2005-06-08 19:23:59.000000000 -0400 *************** *** 0 **** --- 1,105 ---- + + + + + PREPARE TRANSACTION + SQL - Language Statements + + + + PREPARE TRANSACTION + prepare the current transaction for two-phase commit + + + + PREPARE TRANSACTION + + + + + PREPARE TRANSACTION 'global transaction id' + + + + + Description + + + PREPARE TRANSACTION prepares the transaction for two-phase + commit. After this command, the current transaction is no longer + associated with its backend. It can later be committed or rolled + back with COMMIT PREPARED or + ROLLBACK PREPARED, respectively. + + + + All prepared transactions are listed in the pg_prepared_xacts system view. + + + + + Parameters + + + + global transaction id + + + An arbitrary identifier that later acts as a handle to this transaction. + + + + + + + + Notes + + + This command must be used inside a transaction. Use + BEGIN to start one. + + + + + Examples + + Prepare the current transaction for two-phase commit, using + 'foobar' as the global transaction identifier: + + + PREPARE TRANSACTION 'foobar' + + + + + + See Also + + + + + + + + + + diff -Ncr --exclude-from=diff-ignore 00orig/doc/src/sgml/ref/rollback_prepared.sgml 08twophase/doc/src/sgml/ref/rollback_prepared.sgml *** 00orig/doc/src/sgml/ref/rollback_prepared.sgml 1969-12-31 21:00:00.000000000 -0300 --- 08twophase/doc/src/sgml/ref/rollback_prepared.sgml 2005-06-08 19:23:59.000000000 -0400 *************** *** 0 **** --- 1,105 ---- + + + + + ROLLBACK PREPARED + SQL - Language Statements + + + + ROLLBACK PREPARED + abort a transaction that was earlier prepared for two-phase commit + + + + ROLLBACK PREPARED + + + + + ROLLBACK PREPARED 'global transaction id' + + + + + Description + + + ROLLBACK PREPARED rolls back a transaction that is in + prepared state. + + + + You can check all current transactions from the pg_prepared_xacts + system view. + + + + + Parameters + + + + global_transaction_id + + + The global transaction identifier of the transaction that is to be + rolled back. + + + + + + + + Notes + + + This command works outside transaction control. The prepared + transaction is rolled back immediately. + + + + + Examples + + Rolls back the transaction identified by the global transaction + identifier 'foobar': + + + ROLLBACK PREPARED 'foobar' + + + + + + + See Also + + + + + + + + + + diff -Ncr --exclude-from=diff-ignore 00orig/doc/src/sgml/reference.sgml 08twophase/doc/src/sgml/reference.sgml *** 00orig/doc/src/sgml/reference.sgml 2004-08-23 11:14:41.000000000 -0400 --- 08twophase/doc/src/sgml/reference.sgml 2005-06-08 19:23:59.000000000 -0400 *************** *** 62,67 **** --- 62,68 ---- &cluster; &commentOn; &commit; + &commitPrepared; ©Table; &createAggregate; &createCast; *************** *** 120,130 **** --- 121,133 ---- &move; ¬ify; &prepare; + &prepareTransaction; &reindex; &releaseSavepoint; &reset; &revoke; &rollback; + &rollbackPrepared; &rollbackTo; &savepoint; &select; diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/access/transam/Makefile 08twophase/src/backend/access/transam/Makefile *** 00orig/src/backend/access/transam/Makefile 2005-04-28 19:39:54.000000000 -0400 --- 08twophase/src/backend/access/transam/Makefile 2005-06-08 19:23:59.000000000 -0400 *************** *** 12,18 **** top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global ! OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o subtrans.o multixact.o all: SUBSYS.o --- 12,18 ---- top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global ! OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o subtrans.o multixact.o twophase.o twophase_rmgr.o all: SUBSYS.o diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/access/transam/twophase.c 08twophase/src/backend/access/transam/twophase.c *** 00orig/src/backend/access/transam/twophase.c 1969-12-31 21:00:00.000000000 -0300 --- 08twophase/src/backend/access/transam/twophase.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 0 **** --- 1,1114 ---- + /*------------------------------------------------------------------------- + * + * twophase.c + * Two-phase commit support functions. + * + * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * $PostgreSQL$ + * + * NOTES + * Each global transaction is associated with a global transaction + * identifier (GID). The client assigns a GID to a postgres + * transaction with the PREPARE TRANSACTION command. + * + * We keep all active global transactions in a shared memory array. + * When the PREPARE TRANSACTION command is issued, the GID is + * reserved for the transaction in the array. This is done before + * a WAL entry is made, because the reservation checks for duplicate + * GIDs and aborts the transaction if there already is a global + * transaction in prepared state with the same GID. + * + * In order to survive crashes and shutdowns, all prepared + * transactions must be stored in permanent storage. This includes + * locking information, pending notifications etc. All that state + * information is written to the per-transaction state file in + * the pg_twophase directory. + * + *------------------------------------------------------------------------- + */ + #include "postgres.h" + + #include + #include + #include + #include + + #include "access/heapam.h" + #include "access/xact.h" + #include "access/twophase.h" + #include "access/twophase_rmgr.h" + #include "access/subtrans.h" + #include "catalog/pg_type.h" + #include "funcapi.h" + #include "miscadmin.h" + #include "utils/builtins.h" + #include "storage/fd.h" + #include "storage/lock.h" + #include "storage/lwlock.h" + #include "storage/proc.h" + + + /* + * Directory where Two-phase commit files reside within PGDATA + */ + #define TWOPHASE_DIR "pg_twophase" + + /* GUC variable, can't be changed after startup */ + int max_prepared_xacts = 100; + + /* + * This struct describes one global transaction that is in prepared state. + */ + typedef struct GlobalTransactionData + { + char gid[GIDSIZE]; + TransactionId xid; + AclId owner; + + /* + * Indicates that the prepare phase has completely finished. When false, + * the transaction is still under original backends control and must not be + * committed or rolled back yet. + */ + bool fullyPrepared; + } GlobalTransactionData; + /* + * typedef struct GlobalTransactionData *GlobalTransaction appears in + * twophase.h + */ + + /* + * Two Phase Commit shared state. Access to this struct is protected + * by TwoPhaseStateLock. + */ + typedef struct TwoPhaseStateData + { + /* Dummy PGPROC entry which will hold the persisted locks. */ + PGPROC pgproc; + + /* number of used prepXacts items. */ + int numPrepXacts; + /* + * There are max_prepared_xacts items in this array, but C wants a + * fixed-size array. + */ + GlobalTransactionData prepXacts[1]; + } TwoPhaseStateData; /* Actual data follows at end of struct */ + + static TwoPhaseStateData *TwoPhaseState; + + + static void RecordTransactionCommitPrepared(TransactionId xid, int nchildren, + TransactionId *children); + static void RecordTransactionAbortPrepared(TransactionId xid, int nchildren, + TransactionId *children); + static void ProcessRecords(int fd, TransactionId xid, + const TwoPhaseCallback callbacks[]); + + + /* + * MarkAsPreparing + * Reserves the gid for the given transaction. + */ + GlobalTransaction + MarkAsPreparing(TransactionId xid, char *gid, AclId owner) + { + GlobalTransaction gxact; + int i; + + printf("MarkAsPreparing: %d, %s\n", xid, gid); + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + + if (TwoPhaseState->numPrepXacts >= max_prepared_xacts) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("maximum number of prepared transactions reached"))); + /* TODO: proper error code and hint */ + } + + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + if (strcmp(TwoPhaseState->prepXacts[i].gid, gid) == 0) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("global transaction identifier \"%s\" is already in use", gid))); + } + } + + gxact = &TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts]; + (TwoPhaseState->numPrepXacts)++; + + TransactionIdStore(xid, &gxact->xid); + strncpy(gxact->gid, gid, GIDSIZE); + gxact->owner = owner; + gxact->fullyPrepared = false; + + LWLockRelease(TwoPhaseStateLock); + return gxact; + } + + void + MarkAsPrepared(GlobalTransaction gxact) + { + /* Since the flag is only set, no need for locking. */ + Assert(!gxact->fullyPrepared); + gxact->fullyPrepared = true; + } + + /* + * Removes the prepared transaction from the shared memory array. + */ + TransactionId + MarkAsNoLongerPrepared(char *gid, AclId user) + { + GlobalTransaction gxact; + TransactionId xid; + int i; + + printf("MarkAsNoLongerPrepared: %s\n", gid); + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + gxact = &TwoPhaseState->prepXacts[i]; + if (strncmp(gxact->gid, gid, GIDSIZE) == 0) + { + if (!gxact->fullyPrepared) + elog(ERROR, "The prepare phase for that transaction isn't finished yet."); + + xid = gxact->xid; + + if (user != gxact->owner && !superuser_arg(user)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to finish prepared transaction"), + errhint("Must be superuser or the user that prepared the transaction to finish it."))); + + /* Move the last gxact to this slot */ + if (i != (TwoPhaseState->numPrepXacts) - 1) + { + TwoPhaseState->prepXacts[i] = + TwoPhaseState->prepXacts[(TwoPhaseState->numPrepXacts) - 1]; + } + TwoPhaseState->numPrepXacts--; + + LWLockRelease(TwoPhaseStateLock); + return xid; + } + } + + LWLockRelease(TwoPhaseStateLock); + + elog(ERROR, "No prepared transaction with gid %s", gid); + + /* Unreachable code */ + return InvalidTransactionId; + } + + /* + * Returns an array of all prepared transactions for the user-level + * function pg_prepared_xact. + * + * The returned array and all its elements are copies of internal data + * structures, to minimize the time we need to hold the TwoPhaseStateLock. + * + * WARNING -- we return even those transactions that are not fully prepared + * yet. The caller should filter them out if he doesn't want them. + * + * The returned array is palloc'd. + */ + static int + GetPreparedTransactionList(GlobalTransactionData **gxacts) + { + GlobalTransactionData *array; + int num; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + + if (TwoPhaseState->numPrepXacts == 0) + { + LWLockRelease(TwoPhaseStateLock); + + *gxacts = NULL; + return 0; + } + + num = TwoPhaseState->numPrepXacts; + array = palloc(sizeof(GlobalTransactionData) * num); + memcpy(array, TwoPhaseState->prepXacts, sizeof(GlobalTransactionData) * num); + *gxacts = array; + + LWLockRelease(TwoPhaseStateLock); + + return num; + } + + + /* Working status for pg_prepared_xact */ + typedef struct + { + GlobalTransactionData *array; + int ngxacts; + int currIdx; + } Working_State; + + /* + * pg_prepared_xact + * Produce a view with one row per prepared transaction. + * + * This function is here so we don't have to export the + * GlobalTransactionData struct definition. + */ + Datum + pg_prepared_xact(PG_FUNCTION_ARGS) + { + FuncCallContext *funcctx; + Working_State *status; + + if (SRF_IS_FIRSTCALL()) + { + TupleDesc tupdesc; + MemoryContext oldcontext; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* + * Switch to memory context appropriate for multiple function + * calls + */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* build tupdesc for result tuples */ + /* this had better match pg_prepared_xacts view in system_views.sql */ + tupdesc = CreateTemplateTupleDesc(3, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "owner", + NAMEOID, -1, 0); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + + /* + * Collect all the 2PC status information that we will format and + * send out as a result set. + */ + status = (Working_State *) palloc(sizeof(Working_State)); + funcctx->user_fctx = (void *) status; + + status->ngxacts = GetPreparedTransactionList(&status->array); + status->currIdx = 0; + + MemoryContextSwitchTo(oldcontext); + } + + funcctx = SRF_PERCALL_SETUP(); + status = (Working_State *) funcctx->user_fctx; + + while (status->array != NULL && status->currIdx < status->ngxacts) + { + Datum values[3]; + bool nulls[3]; + HeapTuple tuple; + Datum result; + GlobalTransactionData gxact = status->array[status->currIdx++]; + + if (!gxact.fullyPrepared) + continue; + + /* + * Form tuple with appropriate data. + */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = TransactionIdGetDatum(gxact.xid); + values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact.gid)); + values[2] = DirectFunctionCall1(namein, + CStringGetDatum(GetUserNameFromId(gxact.owner))); + + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + result = HeapTupleGetDatum(tuple); + SRF_RETURN_NEXT(funcctx, result); + } + + SRF_RETURN_DONE(funcctx); + } + + /* + * GetPreparedTransactionXidList + * Get a list of Xids used by transactions in prepared state. + */ + int + GetPreparedTransactionsXidList(TransactionId **xids) + { + TransactionId *array; + int num; + int i; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + + if (TwoPhaseState->numPrepXacts == 0) + { + LWLockRelease(TwoPhaseStateLock); + + xids = NULL; + return 0; + } + + num = TwoPhaseState->numPrepXacts; + array = palloc(sizeof(TransactionId) * num); + + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + array[i] = TwoPhaseState->prepXacts[i].xid; + + *xids = array; + + LWLockRelease(TwoPhaseStateLock); + + return i; + } + + /* + * TransactionIdIsPrepared + * True iff transaction associated with the identifier is prepared + * for two-phase commit + * + * Note: + * Assumes transaction identifier is valid and belongs to a top + * transaction id (i.e., not a subtransaction.) + */ + bool + TransactionIdIsPrepared(TransactionId xid) + { + int i; + bool result = false; + + /* + * XXX Maybe assert here that we are only asked about a top transaction + * Id? + */ + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + if (TransactionIdEquals(TwoPhaseState->prepXacts[i].xid, xid)) + { + result = true; + break; + } + } + + LWLockRelease(TwoPhaseStateLock); + + return result; + } + + void + BootStrapTwoPhase(void) + { + /* NOP */ + } + + /* + * Initialization of shared memory + */ + int + TwoPhaseShmemSize(void) + { + #define SIZEOF_TWOPHASE_SHMEM \ + sizeof(TwoPhaseStateData) + \ + sizeof(GlobalTransactionData) * max_prepared_xacts + return SIZEOF_TWOPHASE_SHMEM; + } + + + void + TwoPhaseShmemInit(void) + { + bool found; + + TwoPhaseState = ShmemInitStruct("global transaction lookup table", + SIZEOF_TWOPHASE_SHMEM, + &found); + if (!IsUnderPostmaster) + { + Assert(!found); + MemSet(&TwoPhaseState->pgproc, 0, sizeof(PGPROC)); + TwoPhaseState->pgproc.pid = 0; + SHMQueueInit(&TwoPhaseState->pgproc.procLocks); + } + else + Assert(found); + } + + PGPROC * + TwoPhaseGetDummyProc(void) + { + return &TwoPhaseState->pgproc; + } + + /************************************************************************/ + /* State file support */ + /************************************************************************/ + + #define TwoPhaseFilePath(path, xid) \ + snprintf(path, MAXPGPATH, "%s/%s/%08X", DataDir, TWOPHASE_DIR, xid) + + static void register_statefileblock(void *data, uint32 len); + + + /** + * File format + * + * 1. TwoPhaseFileHeader + * 2. TransactionId (subtransactions) + * 3. TwoPhaseRecordOnDisk + * 4. ... + * + * 7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END) + * 8. CRC32 + */ + + /* + * Header for a 2PC state file + */ + typedef struct TwoPhaseFileHeader + { + TransactionId xid; + AclId owner; + char gid[GIDSIZE]; + int nsubxacts; + } TwoPhaseFileHeader; + + /* + * Header for each record in a state file + * + * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header. + */ + typedef struct TwoPhaseRecordOnDisk + { + uint32 len; /* length of rmgr data */ + TwoPhaseRmgrId rmid; /* resource manager for this record */ + + /* ACTUAL DATA FOLLOWS AT END OF STRUCT */ + char data[0]; + } TwoPhaseRecordOnDisk; + + /* On prepare, the state file is assembled in memory before writing it to + * WAL and the actual state file. + */ + struct xllist + { + XLogRecData *head; /* first data block in the chain */ + XLogRecData *tail; /* last block in chain */ + int bytesFree; /* there is bytesFree bytes left in tail block */ + } records; + + /* + * Appends a block of data to records data structure. + * The data is copied, so the caller is free to modify it + * afterwards. + */ + static void + register_statefileblock(void *data, uint32 len) + { + if (len > records.bytesFree) + { + records.tail->next = palloc0(sizeof(XLogRecData)); + records.tail = records.tail->next; + records.tail->buffer = InvalidBuffer; + records.tail->next = NULL; + + records.bytesFree = len > 512 ? len : 512; + records.tail->data = palloc0(records.bytesFree); + } + else + records.bytesFree -= len; + + memcpy(&((char *)records.tail->data)[records.tail->len], data, len); + records.tail->len += len; + } + + /* + * Starts preparing a state file. + */ + void + StartPrepare(char *gid) + { + TwoPhaseFileHeader hdr; + TransactionId *children; + + /* Initialize linked list */ + records.head = palloc(sizeof(XLogRecData)); + records.head->data = palloc(512); + records.head->len = 0; + records.head->next = NULL; + records.head->buffer = InvalidBuffer; + records.bytesFree = 512; + + records.tail = records.head; + + /* Create header */ + + strncpy(hdr.gid, gid, GIDSIZE); + TransactionIdStore(GetTopTransactionId(), &hdr.xid); + hdr.owner = GetUserId(); + hdr.nsubxacts = xactGetCommittedChildren(&children); + + register_statefileblock(&hdr, sizeof(TwoPhaseFileHeader)); + register_statefileblock(children, hdr.nsubxacts * sizeof(TransactionId)); + } + + /* + * Finishes state file. Calculates CRC and writes state file to WAL and in + * pg_twophase directory. + */ + void + EndPrepare(void) + { + char path[MAXPGPATH]; + TwoPhaseRecordOnDisk end_record; + TransactionId xid = GetTopTransactionId(); + XLogRecData *record; + XLogRecPtr recptr; + static pg_crc32 statefile_crc; + int twophase_fd; + + /* register end sentinel */ + end_record.len = 0; + end_record.rmid = 0; + register_statefileblock(&end_record, sizeof(end_record)); + + /* + * We have to record transaction prepares even if we didn't + * make any updates, because the transaction manager might + * get confused if we lose a global transaction. + */ + + /* Create file and initialize CRC */ + INIT_CRC32(statefile_crc); + + TwoPhaseFilePath(path, xid); + + twophase_fd = BasicOpenFile(path, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + if (twophase_fd < 0) + { + close(twophase_fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create twophase state file \"%s\": %m", + path))); + } + + /* Write records to file. */ + + record = records.head; + while (record != NULL) + { + if ((write(twophase_fd, record->data, record->len)) != record->len) + { + close(twophase_fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to twophase state file: %m"))); + } + + COMP_CRC32(statefile_crc, record->data, record->len); + + record = record->next; + } + + FIN_CRC32(statefile_crc); + + /* The state file isn't valid yet, because we haven't written the CRC yet. */ + /* But before we do that, insert entry to WAL */ + + START_CRIT_SECTION(); + recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE, records.head); + XLogFlush(recptr); + END_CRIT_SECTION(); + + /* If we crash now, we have already prepared */ + + /* write CRC and close file */ + + if ((write(twophase_fd, &statefile_crc, sizeof(statefile_crc))) != sizeof(statefile_crc)) + { + close(twophase_fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to twophase state file: %m"))); + } + + close(twophase_fd); + + records.tail = records.head = NULL; + } + + /* + * Registers a record to be written to state file. + */ + void + RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, void *data, uint32 len) + { + TwoPhaseRecordOnDisk record; + record.rmid = rmid; + record.len = len; + + register_statefileblock(&record, sizeof(TwoPhaseRecordOnDisk)); + register_statefileblock(data, len); + } + + + /* Buffer size used in ValidateTwoPhaseFile */ + #define BUF_SIZE 512 + + /* + * Validates the state file for xid. Returns true if + * it looks OK. + * + * A state file is valid if it has a valid checksum. + */ + static bool + ValidateTwoPhaseFile(TransactionId xid) + { + char path[MAXPGPATH]; + char buf[BUF_SIZE]; + pg_crc32 calc_crc, file_crc; + struct stat stat; + off_t total_read = 0; + int fd; + int len; + + TwoPhaseFilePath(path, xid); + + elog(DEBUG1, "ValidateTwoPhaseFile(%x)", xid); + + fd = BasicOpenFile(path, PG_BINARY, S_IRUSR | S_IWUSR); + if (fd < 0) + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not open twophase state file \"%s\": %m", + path))); + return false; + } + + fstat(fd, &stat); + + /* Read through the file and accumulate the CRC until + * end - sizeof(pg_crc32). */ + + INIT_CRC32(calc_crc); + for (;;) + { + int bytesleft = stat.st_size - total_read - sizeof(pg_crc32); + + len = read(fd, buf, bytesleft < BUF_SIZE ? bytesleft : BUF_SIZE); + total_read += len; + + if (len == -1) + { + close(fd); + return false; + } + + if (len == 0) + break; + + COMP_CRC32(calc_crc, buf, len); + } + FIN_CRC32(calc_crc); + + len = read(fd, &file_crc, sizeof(pg_crc32)); + close(fd); + + if (len != sizeof(pg_crc32)) + return false; + + return EQ_CRC32(calc_crc, file_crc); + } + + static void + read_with_ereport(int fd, void *data, int len) + { + int read_len; + + read_len = read(fd, data, len); + + if (read_len != len) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("error reading twophase state file: %m"))); + } + } + + void + FinishPreparedTransaction(char *gid, bool isCommit) + { + TransactionId xid; + TransactionId *children; + char path[MAXPGPATH]; + int len; + int fd; + TwoPhaseFileHeader hdr; + + Assert(PointerIsValid(gid)); + + xid = MarkAsNoLongerPrepared(gid, GetUserId()); + + TwoPhaseFilePath(path, xid); + + elog(DEBUG1, "FinishPreparedTransaction (%x)", xid); + + fd = BasicOpenFile(path, PG_BINARY, S_IRUSR | S_IWUSR); + if (fd < 0) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open twophase state file \"%s\": %m", + path))); + } + + read_with_ereport(fd, &hdr, sizeof(TwoPhaseFileHeader)); + + Assert(TransactionIdEquals(hdr.xid, xid)); + + len = hdr.nsubxacts * sizeof(TransactionId); + children = palloc(len); + read_with_ereport(fd, children, len); + + if (isCommit) + { + RecordTransactionCommitPrepared(xid, hdr.nsubxacts, children); + ProcessRecords(fd, xid, postcommit_callbacks); + } + else + { + RecordTransactionAbortPrepared(xid, hdr.nsubxacts, children); + ProcessRecords(fd, xid, postabort_callbacks); + } + LockReleaseAllForPrepared(xid); + + close(fd); + + RemoveTwoPhaseFile(xid); + } + + /* + * Reads the state file for the transaction. Calls the + * callbacks for each record in the file. + */ + static void + ProcessRecords(int fd, TransactionId xid, const TwoPhaseCallback callbacks[]) + { + TwoPhaseRecordOnDisk *record; + + elog(DEBUG1, "ProcessRecords (%x)", xid); + + record = palloc(sizeof(TwoPhaseRecordOnDisk)); + for (;;) + { + /* Read record header */ + read_with_ereport(fd, record, sizeof(TwoPhaseRecordOnDisk)); + + Assert(record->rmid <= TWOPHASE_RM_MAX_ID); + + /* Watch for end sentinel */ + if (record->rmid == TWOPHASE_RM_END_ID) + break; + + /* Read resource manager specific data and call the hook */ + record = repalloc(record, sizeof(TwoPhaseRecordOnDisk) + record->len); + read_with_ereport(fd, record->data, record->len); + + if (callbacks[record->rmid] != NULL) + callbacks[record->rmid](xid, record->data, record->len); + } + pfree(record); + } + + void RemoveTwoPhaseFile(TransactionId xid) + { + char path[MAXPGPATH]; + TwoPhaseFilePath(path, xid); + + if (unlink(path)) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove two-phase state file \"%s\": %m", + path))); + } + + /* + * Recreates a state file. This is used in WAL replay. + * + * Note: content and len don't include CRC. + */ + void + RecreateTwoPhaseFile(TransactionId xid, void *content, int len) + { + char path[MAXPGPATH]; + pg_crc32 statefile_crc; + int fd; + + TwoPhaseFilePath(path, xid); + + fd = BasicOpenFile(path, O_CREAT | O_WRONLY | PG_BINARY, S_IRUSR | S_IWUSR); + if (fd < 0) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not recreate twophase state file \"%s\": %m", + path))); + } + + /* Write content */ + INIT_CRC32(statefile_crc); + COMP_CRC32(statefile_crc, content, len); + FIN_CRC32(statefile_crc); + + if (write(fd, content, len) != len) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to twophase state file: %m"))); + } + + if (write(fd, &statefile_crc, sizeof(statefile_crc)) != sizeof(statefile_crc)) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to twophase state file: %m"))); + } + + close(fd); + } + + /* + * Scans the pg_twophase directory and recovers every found + * prepared transaction + */ + void + RecoverPreparedTransactions(void) + { + DIR *cldir; + struct dirent *clde; + char dir[MAXPGPATH]; + + elog(LOG, "Recovering prepared transactions"); + snprintf(dir, MAXPGPATH, "%s/%s", DataDir, TWOPHASE_DIR); + + cldir = AllocateDir(dir); + if (cldir == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open directory \"%s\": %m", dir))); + + errno = 0; + + while ((clde = readdir(cldir)) != NULL) + { + if (strlen(clde->d_name) == 8 && + strspn(clde->d_name, "0123456789ABCDEF") == 8) + { + TransactionId xid; + int i; + sscanf(clde->d_name, "%X", &xid); + + if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("removing stale twophase state file \"%s\"", + clde->d_name))); + RemoveTwoPhaseFile(xid); + continue; + } + + if (!ValidateTwoPhaseFile(xid)) + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("removing corrupt twophase state file \"%s\"", + clde->d_name))); + + RemoveTwoPhaseFile(xid); + continue; + } + + /* The transaction looks good. Read state file. */ + { + char path[MAXPGPATH]; + int fd; + TwoPhaseFileHeader hdr; + GlobalTransaction gxact; + + TwoPhaseFilePath(path, xid); + + elog(DEBUG1, "OpenTwoPhaseFile (%x)", xid); + + fd = BasicOpenFile(path, PG_BINARY, S_IRUSR | S_IWUSR); + if (fd < 0) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open twophase state file \"%s\": %m", + path))); + } + + read_with_ereport(fd, &hdr, sizeof(TwoPhaseFileHeader)); + + Assert(TransactionIdEquals(hdr.xid, xid)); + + for (i = 0; i < hdr.nsubxacts; i++) + { + TransactionId subxid; + read_with_ereport(fd, &subxid, sizeof(TransactionId)); + + SubTransSetParent(subxid, xid); + } + + gxact = MarkAsPreparing(xid, hdr.gid, hdr.owner); + MarkAsPrepared(gxact); + ProcessRecords(fd, xid, recover_callbacks); + close(fd); + } + } + } + if (errno) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read directory \"%s\": %m", dir))); + + FreeDir(cldir); + } + + /* + * Since we don't fsync when we write a state file, we need to fsync them + * on checkpoint. + */ + void + CheckPointTwoPhase(void) + { + TransactionId *xids; + int nxids; + int i; + + nxids = GetPreparedTransactionsXidList(&xids); + + for (i = 0; i < nxids; i++) + { + char path[MAXPGPATH]; + int fd; + int ret; + + TwoPhaseFilePath(path, xids[i]); + + fd = BasicOpenFile(path, PG_BINARY, S_IRUSR | S_IWUSR); + if (fd < 0) + { + /* + * This can happen, if the transaction is committed/rolled back + * after the call to EndPreparedTransactionListScan above + * and before we get here. + */ + } + else + { + ret = pg_fsync(fd); + if (ret != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync twophase state file \"%s\": %m", + path))); + close(fd); + } + } + } + + + /* + * RecordTransactionCommitPrepared + */ + static void + RecordTransactionCommitPrepared(TransactionId xid, int nchildren, TransactionId *children) + { + XLogRecData rdata[2]; + XLogRecPtr recptr; + + elog(DEBUG1, "Committing prepared transaction %d\n", xid); + + START_CRIT_SECTION(); + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *)&nchildren; + rdata[0].len = sizeof(nchildren); + rdata[0].next = &rdata[1]; + + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char *)children; + rdata[1].len = sizeof(TransactionId) * nchildren; + rdata[1].next = NULL; + + recptr = XLogInsertOnBehalf(RM_XACT_ID, + XLOG_XACT_COMMIT_PREPARED | XLOG_NO_TRAN, + rdata, xid); + XLogFlush(recptr); + + TransactionIdCommit(xid); + TransactionIdCommitTree(nchildren, children); + + END_CRIT_SECTION(); + } + + /* + * RecordTransactionAbortPrepared + */ + static void + RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children) + { + XLogRecData rdata[2]; + XLogRecPtr recptr; + + START_CRIT_SECTION(); + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *)&nchildren; + rdata[0].len = sizeof(nchildren); + rdata[0].next = &rdata[1]; + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char *)children; + rdata[1].len = sizeof(TransactionId) * nchildren; + rdata[1].next = NULL; + + recptr = XLogInsertOnBehalf(RM_XACT_ID, + XLOG_XACT_ABORT_PREPARED | XLOG_NO_TRAN, + rdata, xid); + XLogFlush(recptr); + + /* + * Mark the transaction aborted in clog. This is not absolutely + * necessary but we may as well do it while we are here. + */ + TransactionIdAbort(xid); + TransactionIdAbortTree(nchildren, children); + + END_CRIT_SECTION(); + } + diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/access/transam/twophase_rmgr.c 08twophase/src/backend/access/transam/twophase_rmgr.c *** 00orig/src/backend/access/transam/twophase_rmgr.c 1969-12-31 21:00:00.000000000 -0300 --- 08twophase/src/backend/access/transam/twophase_rmgr.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 0 **** --- 1,47 ---- + /* + * twophase_rmgr.c + * + * Twophase resource managers definition + * + * $PostgreSQL$ + */ + #include "postgres.h" + + #include "access/twophase.h" + #include "access/twophase_rmgr.h" + + #include "storage/lockrmgr.h" + #include "storage/smgr.h" + #include "utils/inval.h" + #include "utils/flatfiles.h" + #include "commands/async.h" + + const TwoPhaseCallback recover_callbacks[TWOPHASE_RM_MAX_ID + 1] = + { + NULL, /* Reserved for twophase.c internal logic */ + lock_recover_record, /* Lock */ + NULL, /* Inval */ + NULL, /* smgr */ + NULL, /* user/group file update */ + NULL /* notify/listen */ + }; + + const TwoPhaseCallback postcommit_callbacks[TWOPHASE_RM_MAX_ID + 1] = + { + NULL, /* Reserved for twophase.c internal logic */ + NULL, /* Lock */ + inval_postcommit_record, /* Inval */ + smgr_postcommit_record, /* smgr */ + flatfile_postcommit_record, /* user/group file update */ + notify_postcommit_record /* notify/listen */ + }; + + const TwoPhaseCallback postabort_callbacks[TWOPHASE_RM_MAX_ID + 1] = + { + NULL, /* Reserved for twophase.c internal logic */ + NULL, /* Lock */ + NULL, /* Inval */ + smgr_postabort_record, /* smgr */ + NULL, /* user/group file update */ + NULL + }; diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/access/transam/xact.c 08twophase/src/backend/access/transam/xact.c *** 00orig/src/backend/access/transam/xact.c 2005-06-07 16:44:05.000000000 -0400 --- 08twophase/src/backend/access/transam/xact.c 2005-06-08 20:45:12.000000000 -0400 *************** *** 23,28 **** --- 23,30 ---- #include "access/multixact.h" #include "access/subtrans.h" #include "access/xact.h" + #include "access/twophase.h" + #include "access/twophase_rmgr.h" #include "catalog/heap.h" #include "catalog/index.h" #include "catalog/namespace.h" *************** *** 68,74 **** TRANS_START, TRANS_INPROGRESS, TRANS_COMMIT, ! TRANS_ABORT } TransState; /* --- 70,77 ---- TRANS_START, TRANS_INPROGRESS, TRANS_COMMIT, ! TRANS_ABORT, ! TRANS_PREPARE } TransState; /* *************** *** 99,105 **** TBLOCK_SUBABORT_END, /* failed subxact, ROLLBACK received */ TBLOCK_SUBABORT_PENDING, /* live subxact, ROLLBACK received */ TBLOCK_SUBRESTART, /* live subxact, ROLLBACK TO received */ ! TBLOCK_SUBABORT_RESTART /* failed subxact, ROLLBACK TO received */ } TBlockState; /* --- 102,111 ---- TBLOCK_SUBABORT_END, /* failed subxact, ROLLBACK received */ TBLOCK_SUBABORT_PENDING, /* live subxact, ROLLBACK received */ TBLOCK_SUBRESTART, /* live subxact, ROLLBACK TO received */ ! TBLOCK_SUBABORT_RESTART, /* failed subxact, ROLLBACK TO received */ ! ! /* two-phase commit states */ ! TBLOCK_PREPARE } TBlockState; /* *************** *** 197,202 **** --- 203,210 ---- static SubXactCallbackItem *SubXact_callbacks = NULL; + /* stash for the gid between calls to PrepareTransaction and PrepareTransactionBlock */ + static char gidStash[GIDSIZE+1]; /* local function prototypes */ static void AssignSubTransactionId(TransactionState s); *************** *** 265,270 **** --- 273,280 ---- return true; case TRANS_COMMIT: return true; + case TRANS_PREPARE: + return true; case TRANS_ABORT: return true; } *************** *** 1609,1614 **** --- 1619,1850 ---- } /* + * AbortPreparedTransaction + */ + void + AbortPreparedTransaction(char *gid) + { + FinishPreparedTransaction(gid, false); + } + + /* + * CommitPreparedTransaction + */ + void + CommitPreparedTransaction(char *gid) + { + FinishPreparedTransaction(gid, true); + } + + /* + * PrepareTransaction + */ + static void + PrepareTransaction(void) + { + TransactionState s = CurrentTransactionState; + TransactionId xid = GetCurrentTransactionId(); + GlobalTransaction gxact; + + Assert(MyProc != NULL); + + ShowTransactionState("PrepareTransaction"); + + /* + * check the current transaction state + */ + Assert(s->state == TRANS_INPROGRESS); + Assert(s->parent == NULL); + + /* + * Do pre-commit processing (most of this stuff requires database + * access, and in fact could still cause an error...) + * + * It is possible for CommitHoldablePortals to invoke functions that + * queue deferred triggers, and it's also possible that triggers create + * holdable cursors. So we have to loop until there's nothing left to + * do. + */ + for (;;) + { + /* + * Fire all currently pending deferred triggers. + */ + AfterTriggerFireDeferred(); + + /* + * Convert any open holdable cursors into static portals. If there + * weren't any, we are done ... otherwise loop back to check if they + * queued deferred triggers. Lather, rinse, repeat. + */ + if (!CommitHoldablePortals()) + break; + } + + /* Now we can shut down the deferred-trigger manager */ + AfterTriggerEndXact(true); + + /* Close any open regular cursors */ + AtPrepare_Portals(); + + /* + * Let ON COMMIT management do its thing (must happen after closing + * cursors, to avoid dangling-reference problems) + */ + PreCommit_on_commit_actions(); + + /* Prevent cancel/die interrupt while cleaning up */ + HOLD_INTERRUPTS(); + + /* + * set the current transaction state information appropriately during + * the processing + */ + s->state = TRANS_PREPARE; + + /* Reserve the gid for this transaction. This could still fail if the + * gid is already in use. */ + + /* TODO: If we fail later, the gid stays reserved until restart */ + gxact = MarkAsPreparing(xid, gidStash, GetUserId()); + StartPrepare(gidStash); + gidStash[0] = '\0'; + + /* + * Do pre-commit processing (most of this stuff requires database + * access, and in fact could still cause an error...) + */ + + /* handle prepare for large objects */ + AtPrepare_LargeObject(); + + /* NOTIFY commit must come before lower-level cleanup + */ + AtPrepare_Notify(); + + /* Update the flat files if we changed pg_database, pg_shadow or pg_group + */ + AtPrepare_UpdateFlatFiles(); + + /* + * Let smgr persist its list of pending deletes. + */ + AtPrepare_smgr(); + + /* + * Need to do locks before inval, because we may need to check + * locks against relations deleted in the transaction. + */ + LockPersistAll(); + + AtPrepare_Inval(); + AtPrepare_GUC(); + + + /* + * We have to record transaction prepares even if we didn't + * make any updates, because the transaction manager might + * get confused if we lose a global transaction. + */ + + /* Tell bufmgr and smgr to prepare for commit */ + BufmgrCommit(); + + /* + * Here is where we really truly prepare. + */ + EndPrepare(); + + /* Break the chain of back-links in the XLOG records I output */ + MyLastRecPtr.xrecoff = 0; + MyXactMadeXLogEntry = false; + MyXactMadeTempRelUpdate = false; + + /* Show myself as out of the transaction in PGPROC array */ + MyProc->logRec.xrecoff = 0; + + /* + * Let others know about no transaction in progress by me. + * + * LWLockAcquire(SInvalLock) is required: UPDATE with xid 0 is blocked by + * xid 1' UPDATE, xid 1 is doing commit while xid 2 gets snapshot - if + * xid 2' GetSnapshotData sees xid 1 as running then it must see xid 0 + * as running as well or it will see two tuple versions - one deleted + * by xid 1 and one inserted by xid 0. See notes in GetSnapshotData. + */ + /* Lock SInvalLock because that's what GetSnapshotData uses. */ + LWLockAcquire(SInvalLock, LW_EXCLUSIVE); + MyProc->xid = InvalidTransactionId; + MyProc->xmin = InvalidTransactionId; + LWLockRelease(SInvalLock); + + /* Mark that the prepared is completely done. Only after this + * can others commit/rollback the transaction. */ + MarkAsPrepared(gxact); + + /* + * This is all post-transaction cleanup. Note that if an error is raised + * here, it's too late to abort the transaction. This should be just + * noncritical resource releasing. + * + * The ordering of operations is not entirely random. The idea is: + * release resources visible to other backends (eg, files, buffer + * pins); then release locks; then release backend-local resources. We + * want to release locks at the point where any backend waiting for us + * will see our transaction as being fully cleaned up. + * + * Resources that can be associated with individual queries are handled + * by the ResourceOwner mechanism. The other calls here are for + * backend-wide state. + */ + + CallXactCallbacks(XACT_EVENT_PREPARE); + + ResourceOwnerRelease(TopTransactionResourceOwner, + RESOURCE_RELEASE_BEFORE_LOCKS, + true, true); + + ResourceOwnerRelease(TopTransactionResourceOwner, + RESOURCE_RELEASE_LOCKS, + true, true); + ResourceOwnerRelease(TopTransactionResourceOwner, + RESOURCE_RELEASE_AFTER_LOCKS, + true, true); + + + AtEOXact_SPI(true); + AtEOXact_on_commit_actions(true); + AtEOXact_Namespace(true); + /* smgrcommit already done */ + AtEOXact_Files(); + /* TODO: */ + /* pgstat_count_xact_commit(); */ + + CurrentResourceOwner = NULL; + + ResourceOwnerDelete(TopTransactionResourceOwner); + s->curTransactionOwner = NULL; + CurTransactionResourceOwner = NULL; + TopTransactionResourceOwner = NULL; + + AtCommit_Memory(); + + s->transactionId = InvalidTransactionId; + s->subTransactionId = InvalidSubTransactionId; + s->nestingLevel = 0; + s->childXids = NIL; + + /* + * done with 1st phase commit processing, set current transaction + * state back to default + */ + s->state = TRANS_DEFAULT; + + RESUME_INTERRUPTS(); + } + + + /* * AbortTransaction */ static void *************** *** 1640,1646 **** /* * check the current transaction state */ ! if (s->state != TRANS_INPROGRESS) elog(WARNING, "AbortTransaction while in %s state", TransStateAsString(s->state)); Assert(s->parent == NULL); --- 1876,1882 ---- /* * check the current transaction state */ ! if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE) elog(WARNING, "AbortTransaction while in %s state", TransStateAsString(s->state)); Assert(s->parent == NULL); *************** *** 1833,1838 **** --- 2069,2075 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(ERROR, "StartTransactionCommand: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 1964,1969 **** --- 2201,2212 ---- CommitTransaction(); s->blockState = TBLOCK_DEFAULT; } + else if (s->blockState == TBLOCK_PREPARE) + { + Assert(s->parent == NULL); + PrepareTransaction(); + s->blockState = TBLOCK_DEFAULT; + } else { Assert(s->blockState == TBLOCK_INPROGRESS || *************** *** 2047,2052 **** --- 2290,2305 ---- s->blockState = TBLOCK_SUBINPROGRESS; } break; + + /* + * This is the case when we just got the "PREPARE TRANSACTION" + * statement, so we prepare the transaction and go back to the + * default state. + */ + case TBLOCK_PREPARE: + PrepareTransaction(); + s->blockState = TBLOCK_DEFAULT; + break; } } *************** *** 2187,2192 **** --- 2440,2452 ---- CleanupSubTransaction(); AbortCurrentTransaction(); break; + + case TBLOCK_PREPARE: + s->blockState = TBLOCK_DEFAULT; + AbortTransaction(); + CleanupTransaction(); + break; + } } *************** *** 2487,2492 **** --- 2747,2753 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "BeginTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 2494,2499 **** --- 2755,2785 ---- } /* + * PrepareTransactionBlock + */ + bool + PrepareTransactionBlock(char *gid) + { + TransactionState s; + bool result; + + result = EndTransactionBlock(); + + if (result) + { + s = CurrentTransactionState; + + while (s->parent != NULL) + s = s->parent; + + s->blockState = TBLOCK_PREPARE; + strncpy(gidStash, gid, GIDSIZE); + } + return result; + } + + + /* * EndTransactionBlock * This executes a COMMIT command. * *************** *** 2603,2608 **** --- 2889,2895 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "EndTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 2694,2699 **** --- 2981,2987 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "UserAbortTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 2740,2745 **** --- 3028,3034 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "DefineSavepoint: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 2795,2800 **** --- 3084,3090 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "ReleaseSavepoint: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 2892,2897 **** --- 3182,3188 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "RollbackToSavepoint: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 2999,3004 **** --- 3290,3296 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "BeginInternalSubTransaction: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 3055,3060 **** --- 3347,3353 ---- case TBLOCK_BEGIN: case TBLOCK_SUBBEGIN: case TBLOCK_INPROGRESS: + case TBLOCK_PREPARE: case TBLOCK_END: case TBLOCK_SUBEND: case TBLOCK_ABORT: *************** *** 3111,3116 **** --- 3404,3410 ---- case TBLOCK_INPROGRESS: case TBLOCK_END: case TBLOCK_ABORT_PENDING: + case TBLOCK_PREPARE: /* In a transaction, so clean up */ AbortTransaction(); CleanupTransaction(); *************** *** 3202,3207 **** --- 3496,3502 ---- case TBLOCK_SUBINPROGRESS: case TBLOCK_END: case TBLOCK_SUBEND: + case TBLOCK_PREPARE: return 'T'; /* in transaction */ case TBLOCK_ABORT: case TBLOCK_SUBABORT: *************** *** 3700,3705 **** --- 3995,4002 ---- return "SUB RESTART"; case TBLOCK_SUBABORT_RESTART: return "SUB AB RESTRT"; + case TBLOCK_PREPARE: + return "PREPARE"; } return "UNRECOGNIZED"; } *************** *** 3723,3728 **** --- 4020,4027 ---- return "ABORT"; case TRANS_INPROGRESS: return "INPROGR"; + case TRANS_PREPARE: + return "PREPARE"; } return "UNRECOGNIZED"; } *************** *** 3771,3804 **** xact_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; if (info == XLOG_XACT_COMMIT) { xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); - TransactionId *sub_xids; - TransactionId max_xid; - int i; ! TransactionIdCommit(record->xl_xid); /* Mark committed subtransactions as committed */ sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); TransactionIdCommitTree(xlrec->nsubxacts, sub_xids); - /* Make sure nextXid is beyond any XID mentioned in the record */ - max_xid = record->xl_xid; - for (i = 0; i < xlrec->nsubxacts; i++) - { - if (TransactionIdPrecedes(max_xid, sub_xids[i])) - max_xid = sub_xids[i]; - } - if (TransactionIdFollowsOrEquals(max_xid, - ShmemVariableCache->nextXid)) - { - ShmemVariableCache->nextXid = max_xid; - TransactionIdAdvance(ShmemVariableCache->nextXid); - } - /* Make sure files supposed to be dropped are dropped */ for (i = 0; i < xlrec->nrels; i++) { --- 4070,4092 ---- xact_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; + TransactionId xid = record->xl_xid; + TransactionId max_xid; + TransactionId *sub_xids = NULL; + int nsubxacts = 0; + int i; if (info == XLOG_XACT_COMMIT) { xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); ! TransactionIdCommit(xid); /* Mark committed subtransactions as committed */ + nsubxacts = xlrec->nsubxacts; sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); TransactionIdCommitTree(xlrec->nsubxacts, sub_xids); /* Make sure files supposed to be dropped are dropped */ for (i = 0; i < xlrec->nrels; i++) { *************** *** 3810,3838 **** { xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); TransactionId *sub_xids; - TransactionId max_xid; - int i; TransactionIdAbort(record->xl_xid); /* Mark subtransactions as aborted */ sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); TransactionIdAbortTree(xlrec->nsubxacts, sub_xids); - /* Make sure nextXid is beyond any XID mentioned in the record */ - max_xid = record->xl_xid; - for (i = 0; i < xlrec->nsubxacts; i++) - { - if (TransactionIdPrecedes(max_xid, sub_xids[i])) - max_xid = sub_xids[i]; - } - if (TransactionIdFollowsOrEquals(max_xid, - ShmemVariableCache->nextXid)) - { - ShmemVariableCache->nextXid = max_xid; - TransactionIdAdvance(ShmemVariableCache->nextXid); - } - /* Make sure files supposed to be dropped are dropped */ for (i = 0; i < xlrec->nrels; i++) { --- 4098,4111 ---- { xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); TransactionId *sub_xids; TransactionIdAbort(record->xl_xid); /* Mark subtransactions as aborted */ + nsubxacts = xlrec->nsubxacts; sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); TransactionIdAbortTree(xlrec->nsubxacts, sub_xids); /* Make sure files supposed to be dropped are dropped */ for (i = 0; i < xlrec->nrels; i++) { *************** *** 3840,3847 **** --- 4113,4154 ---- smgrdounlink(smgropen(xlrec->xnodes[i]), false, true); } } + else if (info == XLOG_XACT_PREPARE) + { + RecreateTwoPhaseFile(xid, XLogRecGetData(record), record->xl_len); + } + else if (info == XLOG_XACT_COMMIT_PREPARED) + { + nsubxacts = *((int *)XLogRecGetData(record)); + sub_xids = (TransactionId *)(XLogRecGetData(record) + sizeof(int)); + TransactionIdCommit(xid); + TransactionIdCommitTree(nsubxacts, sub_xids); + RemoveTwoPhaseFile(xid); + } + else if (info == XLOG_XACT_ABORT_PREPARED) + { + nsubxacts = *((int *)XLogRecGetData(record)); + sub_xids = (TransactionId *)(XLogRecGetData(record) + sizeof(int)); + TransactionIdAbort(xid); + TransactionIdAbortTree(nsubxacts, sub_xids); + RemoveTwoPhaseFile(xid); + } else elog(PANIC, "xact_redo: unknown op code %u", info); + + /* Make sure nextXid is beyond any XID mentioned in the record */ + max_xid = xid; + for (i = 0; i < nsubxacts; i++) + { + if (TransactionIdPrecedes(max_xid, sub_xids[i])) + max_xid = sub_xids[i]; + } + if (TransactionIdFollowsOrEquals(max_xid, + ShmemVariableCache->nextXid)) + { + ShmemVariableCache->nextXid = max_xid; + TransactionIdAdvance(ShmemVariableCache->nextXid); + } } void *************** *** 3908,3913 **** --- 4215,4250 ---- sprintf(buf + strlen(buf), " %u", xacts[i]); } } + else if (info == XLOG_XACT_PREPARE) + { + sprintf(buf + strlen(buf), "prepare"); + } + else if (info == XLOG_XACT_COMMIT_PREPARED) + { + int nsubxacts = *((int *)rec); + TransactionId *sub_xids = (TransactionId *)(((char *)rec) + sizeof(int)); + + sprintf(buf + strlen(buf), "commit_prepared: "); + if (nsubxacts > 0) + { + sprintf(buf + strlen(buf), "; subxacts:"); + for (i = 0; i < nsubxacts; i++) + sprintf(buf + strlen(buf), " %u", sub_xids[i]); + } + } + else if (info == XLOG_XACT_ABORT_PREPARED) + { + int nsubxacts = *((int *)rec); + TransactionId *sub_xids = (TransactionId *)(((char *)rec) + sizeof(int)); + + sprintf(buf + strlen(buf), "abort_prepared: "); + if (nsubxacts > 0) + { + sprintf(buf + strlen(buf), "; subxacts:"); + for (i = 0; i < nsubxacts; i++) + sprintf(buf + strlen(buf), " %u", sub_xids[i]); + } + } else strcat(buf, "UNKNOWN"); } diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/access/transam/xlog.c 08twophase/src/backend/access/transam/xlog.c *** 00orig/src/backend/access/transam/xlog.c 2005-06-08 19:16:29.000000000 -0400 --- 08twophase/src/backend/access/transam/xlog.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 29,34 **** --- 29,35 ---- #include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" + #include "access/twophase.h" #include "catalog/catversion.h" #include "catalog/pg_control.h" #include "miscadmin.h" *************** *** 489,494 **** --- 490,502 ---- XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) { + return XLogInsertOnBehalf(rmid, info, rdata, GetCurrentTransactionIdIfAny()); + } + + + XLogRecPtr + XLogInsertOnBehalf(RmgrId rmid, uint8 info, XLogRecData *rdata, TransactionId xid) + { XLogCtlInsert *Insert = &XLogCtl->Insert; XLogRecord *record; XLogContRecord *contrecord; *************** *** 3767,3775 **** WriteControlFile(); ! /* Bootstrap the commit log, too */ BootStrapCLOG(); BootStrapSUBTRANS(); BootStrapMultiXact(); } --- 3775,3784 ---- WriteControlFile(); ! /* Bootstrap the commit log, subtransaction and twophase subsystems, too */ BootStrapCLOG(); BootStrapSUBTRANS(); + BootStrapTwoPhase(); BootStrapMultiXact(); } *************** *** 4649,4654 **** --- 4658,4666 ---- } #endif + StartupSUBTRANS(); + RecoverPreparedTransactions(); + if (InRecovery) { int rmid; *************** *** 4710,4716 **** /* Start up the commit log and related stuff, too */ StartupCLOG(); - StartupSUBTRANS(); StartupMultiXact(); ereport(LOG, --- 4722,4727 ---- *************** *** 5107,5112 **** --- 5118,5124 ---- CheckPointCLOG(); CheckPointSUBTRANS(); + CheckPointTwoPhase(); CheckPointMultiXact(); FlushBufferPool(); diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/catalog/system_views.sql 08twophase/src/backend/catalog/system_views.sql *** 00orig/src/backend/catalog/system_views.sql 2005-05-18 21:30:56.000000000 -0400 --- 08twophase/src/backend/catalog/system_views.sql 2005-06-08 19:23:59.000000000 -0400 *************** *** 266,271 **** --- 266,275 ---- transaction xid, classid oid, objid oid, objsubid int2, pid int4, mode text, granted boolean); + CREATE VIEW pg_prepared_xacts AS + SELECT * + FROM pg_prepared_xact() AS L(transaction xid, gid text, owner name); + CREATE VIEW pg_settings AS SELECT * FROM pg_show_all_settings() AS A diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/commands/async.c 08twophase/src/backend/commands/async.c *** 00orig/src/backend/commands/async.c 2005-05-11 18:09:28.000000000 -0400 --- 08twophase/src/backend/commands/async.c 2005-06-08 20:25:43.000000000 -0400 *************** *** 78,83 **** --- 78,84 ---- #include #include "access/heapam.h" + #include "access/twophase_rmgr.h" #include "catalog/pg_listener.h" #include "commands/async.h" #include "libpq/libpq.h" *************** *** 407,412 **** --- 408,439 ---- CommitTransactionCommand(); } + + /* + *-------------------------------------------------------------- + * AtPrepare_Notify + * + * This is called at the prepare phase of a two-phase + * transaction. + * + * Not implemented. Throw an error if there was any notify + * requests in this transaction. + * + *-------------------------------------------------------------- + */ + void + AtPrepare_Notify(void) + { + ListCell *p; + + foreach(p, pendingNotifies) + { + char *relname = lfirst(p); + RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, relname, strlen(relname) + 1); + } + ClearPendingNotifies(); + } + /* *-------------------------------------------------------------- * AtCommit_Notify *************** *** 1037,1039 **** --- 1064,1073 ---- */ pendingNotifies = NIL; } + + void + notify_postcommit_record(TransactionId xid, void *recdata, uint32 len) + { + /* Piggyback the current transaction running COMMIT PREPARED */ + Async_Notify((char *) recdata); + } diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/libpq/be-fsstubs.c 08twophase/src/backend/libpq/be-fsstubs.c *** 00orig/src/backend/libpq/be-fsstubs.c 2005-01-10 19:05:18.000000000 -0300 --- 08twophase/src/backend/libpq/be-fsstubs.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 503,508 **** --- 503,515 ---- PG_RETURN_INT32(1); } + void + AtPrepare_LargeObject(void) + { + if (fscxt != NULL) + elog(ERROR, "two-phase commit of large objects is not implemented"); + } + /* * AtEOXact_LargeObject - * prepares large objects for transaction commit diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/parser/gram.y 08twophase/src/backend/parser/gram.y *** 00orig/src/backend/parser/gram.y 2005-05-11 18:09:31.000000000 -0400 --- 08twophase/src/backend/parser/gram.y 2005-06-08 19:23:59.000000000 -0400 *************** *** 387,393 **** ORDER OUT_P OUTER_P OVERLAPS OVERLAY OWNER PARTIAL PASSWORD PLACING POSITION ! PRECISION PRESERVE PREPARE PRIMARY PRIOR PRIVILEGES PROCEDURAL PROCEDURE QUOTE --- 387,393 ---- ORDER OUT_P OUTER_P OVERLAPS OVERLAY OWNER PARTIAL PASSWORD PLACING POSITION ! PRECISION PRESERVE PREPARE PREPARED PRIMARY PRIOR PRIVILEGES PROCEDURAL PROCEDURE QUOTE *************** *** 4059,4064 **** --- 4059,4085 ---- n->options = $3; $$ = (Node *)n; } + | PREPARE TRANSACTION Sconst + { + TransactionStmt *n = makeNode(TransactionStmt); + n->kind = TRANS_STMT_PREPARE; + n->gid = $3; + $$ = (Node *)n; + } + | COMMIT PREPARED Sconst + { + TransactionStmt *n = makeNode(TransactionStmt); + n->kind = TRANS_STMT_COMMIT_PREPARED; + n->gid = $3; + $$ = (Node *)n; + } + | ROLLBACK PREPARED Sconst + { + TransactionStmt *n = makeNode(TransactionStmt); + n->kind = TRANS_STMT_ROLLBACK_PREPARED; + n->gid = $3; + $$ = (Node *)n; + } | COMMIT opt_transaction { TransactionStmt *n = makeNode(TransactionStmt); *************** *** 7842,7847 **** --- 7863,7869 ---- | PARTIAL | PASSWORD | PREPARE + | PREPARED | PRESERVE | PRIOR | PRIVILEGES diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/parser/keywords.c 08twophase/src/backend/parser/keywords.c *** 00orig/src/backend/parser/keywords.c 2005-05-11 18:09:31.000000000 -0400 --- 08twophase/src/backend/parser/keywords.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 242,247 **** --- 242,248 ---- {"position", POSITION}, {"precision", PRECISION}, {"prepare", PREPARE}, + {"prepared", PREPARED}, {"preserve", PRESERVE}, {"primary", PRIMARY}, {"prior", PRIOR}, diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/storage/ipc/ipci.c 08twophase/src/backend/storage/ipc/ipci.c *** 00orig/src/backend/storage/ipc/ipci.c 2005-05-19 18:47:04.000000000 -0400 --- 08twophase/src/backend/storage/ipc/ipci.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 17,22 **** --- 17,23 ---- #include "access/clog.h" #include "access/multixact.h" #include "access/subtrans.h" + #include "access/twophase.h" #include "access/xlog.h" #include "miscadmin.h" #include "postmaster/bgwriter.h" *************** *** 77,82 **** --- 78,84 ---- size += XLOGShmemSize(); size += CLOGShmemSize(); size += SUBTRANSShmemSize(); + size += TwoPhaseShmemSize(); size += MultiXactShmemSize(); size += LWLockShmemSize(); size += ProcArrayShmemSize(maxBackends); *************** *** 144,149 **** --- 146,152 ---- XLOGShmemInit(); CLOGShmemInit(); SUBTRANSShmemInit(); + TwoPhaseShmemInit(); MultiXactShmemInit(); InitBufferPool(); diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/storage/ipc/procarray.c 08twophase/src/backend/storage/ipc/procarray.c *** 00orig/src/backend/storage/ipc/procarray.c 2005-05-19 21:43:03.000000000 -0400 --- 08twophase/src/backend/storage/ipc/procarray.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 25,30 **** --- 25,31 ---- #include "postgres.h" #include "access/subtrans.h" + #include "access/twophase.h" #include "miscadmin.h" #include "storage/proc.h" #include "storage/procarray.h" *************** *** 51,61 **** --- 52,64 ---- /* counters for XidCache measurement */ static long xc_by_recent_xmin = 0; + static long xc_is_prepared = 0; static long xc_by_main_xid = 0; static long xc_by_child_xid = 0; static long xc_slow_answer = 0; #define xc_by_recent_xmin_inc() (xc_by_recent_xmin++) + #define xc_is_prepared() (xc_is_prepared++) #define xc_by_main_xid_inc() (xc_by_main_xid++) #define xc_by_child_xid_inc() (xc_by_child_xid++) #define xc_slow_answer_inc() (xc_slow_answer++) *************** *** 65,70 **** --- 68,74 ---- #else /* !XIDCACHE_DEBUG */ #define xc_by_recent_xmin_inc() ((void) 0) + #define xc_is_prepared() ((void) 0) #define xc_by_main_xid_inc() ((void) 0) #define xc_by_child_xid_inc() ((void) 0) #define xc_slow_answer_inc() ((void) 0) *************** *** 315,325 **** if (TransactionIdEquals(xids[i], topxid)) { result = true; ! break; } } } result_known: if (locked) LWLockRelease(ProcArrayLock); --- 319,339 ---- if (TransactionIdEquals(xids[i], topxid)) { result = true; ! goto result_known; } } } + /* + * Still no luck. Maybe it's a prepared transaction. Note that we only + * need to check for the top transaction id. + */ + if (TransactionIdIsPrepared(topxid)) + { + xc_is_prepared(); + result = true; + } + result_known: if (locked) LWLockRelease(ProcArrayLock); *************** *** 390,395 **** --- 404,424 ---- } } + if (allDbs) + { + TransactionId *xids; + int nxids; + int i; + + nxids = GetPreparedTransactionsXidList(&xids); + + for (i = 0; i < nxids; i++) + { + if (TransactionIdPrecedes(xids[i], result)) + result = xids[i]; + } + } + LWLockRelease(ProcArrayLock); return result; *************** *** 432,437 **** --- 461,468 ---- TransactionId globalxmin; int index; int count = 0; + TransactionId *xids; + int nxids; Assert(snapshot != NULL); *************** *** 456,462 **** * First call for this snapshot */ snapshot->xip = (TransactionId *) ! malloc(MaxBackends * sizeof(TransactionId)); if (snapshot->xip == NULL) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), --- 487,493 ---- * First call for this snapshot */ snapshot->xip = (TransactionId *) ! malloc((MaxBackends + max_prepared_xacts) * sizeof(TransactionId)); if (snapshot->xip == NULL) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), *************** *** 533,538 **** --- 564,582 ---- globalxmin = xid; } + nxids = GetPreparedTransactionsXidList(&xids); + + for (index = 0; index < nxids; index++) + { + if (!TransactionIdIsNormal(xids[index]) || TransactionIdFollowsOrEquals(xids[index], xmax)) + continue; + + if (TransactionIdPrecedes(xids[index], xmin)) + xmin = xids[index]; + + snapshot->xip[count++] = xids[index]; + } + if (serializable) MyProc->xmin = TransactionXmin = xmin; *************** *** 777,784 **** DisplayXidCache(void) { fprintf(stderr, ! "XidCache: xmin: %ld, mainxid: %ld, childxid: %ld, slow: %ld\n", xc_by_recent_xmin, xc_by_main_xid, xc_by_child_xid, xc_slow_answer); --- 821,829 ---- DisplayXidCache(void) { fprintf(stderr, ! "XidCache: xmin: %ld, prepared: %ld, mainxid: %ld, childxid: %ld, slow: %ld\n", xc_by_recent_xmin, + xc_is_prepared, xc_by_main_xid, xc_by_child_xid, xc_slow_answer); diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/storage/lmgr/lock.c 08twophase/src/backend/storage/lmgr/lock.c *** 00orig/src/backend/storage/lmgr/lock.c 2005-06-01 01:04:04.000000000 -0400 --- 08twophase/src/backend/storage/lmgr/lock.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 34,43 **** --- 34,47 ---- #include #include "access/xact.h" + #include "access/twophase.h" + #include "access/twophase_rmgr.h" #include "miscadmin.h" #include "storage/proc.h" + #include "storage/lockrmgr.h" #include "utils/memutils.h" #include "utils/ps_status.h" + #include "utils/relcache.h" #include "utils/resowner.h" *************** *** 78,83 **** --- 82,94 ---- }; + /* Record that's written to xlog when a lock is persisted */ + typedef struct TwoPhaseLockRecord + { + LOCKTAG locktag; + LOCKMASK holdMask; + } TwoPhaseLockRecord; + #ifdef LOCK_DEBUG /*------ *************** *** 157,162 **** --- 168,175 ---- #define PROCLOCK_PRINT(where, proclockP) #endif /* not LOCK_DEBUG */ + static bool LockAcquireForProc(LOCKMETHODID lockmethodid, LOCKTAG *locktag, + TransactionId xid, LOCKMODE lockmode, bool dontWait, PGPROC *proc); static void RemoveLocalLock(LOCALLOCK *locallock); static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner); *************** *** 171,178 **** /* ! * InitLocks -- Init the lock module. Create a private data ! * structure for constructing conflict masks. */ void InitLocks(void) --- 184,190 ---- /* ! * InitLocks -- Init the lock module. Nothing to do here at present. */ void InitLocks(void) *************** *** 423,428 **** --- 435,447 ---- LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, TransactionId xid, LOCKMODE lockmode, bool dontWait) { + return LockAcquireForProc(lockmethodid, locktag, xid, lockmode, dontWait, MyProc); + } + + bool + LockAcquireForProc(LOCKMETHODID lockmethodid, LOCKTAG *locktag, + TransactionId xid, LOCKMODE lockmode, bool dontWait, PGPROC *proc) + { LOCALLOCKTAG localtag; LOCALLOCK *locallock; LOCK *lock; *************** *** 565,571 **** */ MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */ proclocktag.lock = MAKE_OFFSET(lock); ! proclocktag.proc = MAKE_OFFSET(MyProc); TransactionIdStore(xid, &proclocktag.xid); /* --- 584,590 ---- */ MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */ proclocktag.lock = MAKE_OFFSET(lock); ! proclocktag.proc = MAKE_OFFSET(proc); TransactionIdStore(xid, &proclocktag.xid); /* *************** *** 607,613 **** proclock->holdMask = 0; /* Add proclock to appropriate lists */ SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); ! SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink); PROCLOCK_PRINT("LockAcquire: new", proclock); } else --- 626,632 ---- proclock->holdMask = 0; /* Add proclock to appropriate lists */ SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); ! SHMQueueInsertBefore(&proc->procLocks, &proclock->procLink); PROCLOCK_PRINT("LockAcquire: new", proclock); } else *************** *** 661,667 **** * If this process (under any XID) is a holder of the lock, just grant * myself another one without blocking. */ ! LockCountMyLocks(proclock->tag.lock, MyProc, myHolding); if (myHolding[lockmode] > 0) { GrantLock(lock, proclock, lockmode); --- 680,686 ---- * If this process (under any XID) is a holder of the lock, just grant * myself another one without blocking. */ ! LockCountMyLocks(proclock->tag.lock, proc, myHolding); if (myHolding[lockmode] > 0) { GrantLock(lock, proclock, lockmode); *************** *** 681,687 **** else status = LockCheckConflicts(lockMethodTable, lockmode, lock, proclock, ! MyProc, myHolding); if (status == STATUS_OK) { --- 700,706 ---- else status = LockCheckConflicts(lockMethodTable, lockmode, lock, proclock, ! proc, myHolding); if (status == STATUS_OK) { *************** *** 733,739 **** if (myHolding[i] > 0) heldLocks |= LOCKBIT_ON(i); } ! MyProc->heldLocks = heldLocks; } /* --- 752,758 ---- if (myHolding[i] > 0) heldLocks |= LOCKBIT_ON(i); } ! proc->heldLocks = heldLocks; } /* *************** *** 1630,1635 **** --- 1649,1871 ---- } } + /* + * LockPersistAll + * Take care of the locks held by a transaction that is "preparing" for + * two-phase commit. + */ + bool + LockPersistAll(void) + { + HASH_SEQ_STATUS status; + SHM_QUEUE *procLocks = &(MyProc->procLocks); + LWLockId masterLock; + LockMethod lockMethodTable; + int numLockModes; + LOCALLOCK *locallock; + PROCLOCK *oldproclock; + PROCLOCK *newproclock; + LOCK *lock; + LOCKMETHODID lockmethodid = DEFAULT_LOCKMETHOD; + PROCLOCKTAG proclocktag; + PGPROC *dummyProc; + bool found; + + #ifdef LOCK_DEBUG + if (Trace_locks) + elog(LOG, "LockPersistAll: lockmethod=%d", lockmethodid); + #endif + + lockMethodTable = LockMethods[lockmethodid]; + if (!lockMethodTable) + { + elog(WARNING, "bad lock method: %d", lockmethodid); + return FALSE; + } + + numLockModes = lockMethodTable->numLockModes; + masterLock = lockMethodTable->masterLock; + + /* + * First we scan the locallock table and get rid of entries pointing to + * those locks we will persist. Then we scan the transaction's proclock + * table, record each proclock in the twophase state file, and remove it. + * + * We do this in two passes because we may have multiple locallock + * entries pointing to the same proclock, and we daren't end up with + * any dangling pointers. + */ + hash_seq_init(&status, LockMethodLocalHash[lockmethodid]); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + LOCK *lock; + + if (locallock->proclock == NULL || locallock->lock == NULL) + { + /* + * We must've run out of shared memory while trying to set up + * this lock. Just forget the local entry. + */ + Assert(locallock->nLocks == 0); + RemoveLocalLock(locallock); + continue; + } + + /* Ignore items that are not of the lockmethod to be removed */ + if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid) + continue; + + /* Ignore session locks */ + if (!TransactionIdIsValid(locallock->tag.xid)) + continue; + + /* + * Make sure the transaction didn't operate on a temp table. + * We have better do this before grabbing the lmgr's master lock, + * because RelationIdGetRelation would try to grab it again. + */ + lock = locallock->lock; + + if (lock->tag.locktag_type == LOCKTAG_RELATION) + { + Relation rel = RelationIdGetRelation(lock->tag.locktag_field2); + + if (rel != NULL) + { + if (rel->rd_istemp) + ereport(ERROR, + /* FIXME -- get the right errcode and errmsg */ + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("persisting transactions operating on temporal tables is not supported"))); + RelationClose(rel); + } + else + elog(WARNING, "lock acquired on non-existant relation %u", lock->tag.locktag_field2); + } + + RemoveLocalLock(locallock); + } + + LWLockAcquire(masterLock, LW_EXCLUSIVE); + + oldproclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)); + + dummyProc = TwoPhaseGetDummyProc(); + + while (oldproclock) + { + PROCLOCK *nextHolder; + + /* Get link first, since we may unlink/delete this proclock */ + nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &oldproclock->procLink, + offsetof(PROCLOCK, procLink)); + + Assert(oldproclock->tag.proc == MAKE_OFFSET(MyProc)); + + lock = (LOCK *) MAKE_PTR(oldproclock->tag.lock); + + /* Ignore items that are not of the lockmethod to be removed */ + if (LOCK_LOCKMETHOD(*lock) != lockmethodid) + goto next_item; + + /* Ignore session locks */ + if (!TransactionIdIsValid(oldproclock->tag.xid)) + goto next_item; + + PROCLOCK_PRINT("LockPersistAll", oldproclock); + LOCK_PRINT("LockPersistAll", lock, 0); + Assert(lock->nRequested >= 0); + Assert(lock->nGranted >= 0); + Assert(lock->nGranted <= lock->nRequested); + Assert((oldproclock->holdMask & ~lock->grantMask) == 0); + + /* + * To reassign the lock to the dummy PGPROC struct, we create a new + * proclock identical to the original, except that it points to the + * dummy PGPROC. We then insert that in the LOCK.lockHolders list, + * and remove the original proclock from the linked lists and the hash + * table. + */ + + /* + * Create the hash key for the proclock table. + */ + MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding, + * needed */ + proclocktag.lock = oldproclock->tag.lock; + proclocktag.proc = MAKE_OFFSET(dummyProc); + TransactionIdStore(oldproclock->tag.xid, &proclocktag.xid); + + newproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &proclocktag, + HASH_ENTER, &found); + if (!newproclock) + { + LWLockRelease(masterLock); + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Not enough memory for storing the prepared transaction's held locks."))); + return FALSE; + } + + /* + * If new, initialize the new entry + */ + if (!found) + { + newproclock->holdMask = oldproclock->holdMask; + /* Add proclock to appropriate lists */ + SHMQueueInsertBefore(&lock->procLocks, &newproclock->lockLink); + SHMQueueInsertBefore(&dummyProc->procLocks, &newproclock->procLink); + PROCLOCK_PRINT("LockPersistAll: new", newproclock); + } + + /* Make a recovery record that is used to reacquire the + * lock after shutdown or crash. + */ + { + TwoPhaseLockRecord record; + + LOCK *lock = (LOCK *) MAKE_PTR(oldproclock->tag.lock); + memcpy(&(record.locktag), &(lock->tag), sizeof(LOCKTAG)); + record.holdMask = oldproclock->holdMask; + + RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, &record, sizeof(TwoPhaseLockRecord)); + } + + PROCLOCK_PRINT("LockPersistAll: deleting", oldproclock); + + /* remove the old proclock entry from the hashtable and linked lists */ + SHMQueueDelete(&oldproclock->lockLink); + SHMQueueDelete(&oldproclock->procLink); + oldproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &(oldproclock->tag), + HASH_REMOVE, + NULL); + if (!oldproclock) + { + LWLockRelease(masterLock); + elog(WARNING, "proclock table corrupted"); + return FALSE; + } + + next_item: + oldproclock = nextHolder; + } + + LWLockRelease(masterLock); + + #ifdef LOCK_DEBUG + if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) + elog(LOG, "LockPersistAll done"); + #endif + + return TRUE; + } + /* * Estimate shared-memory space used for lock tables *************** *** 1734,1750 **** * * Must have already acquired the masterLock. */ ! void ! DumpLocks(void) { - PGPROC *proc; SHM_QUEUE *procLocks; PROCLOCK *proclock; LOCK *lock; int lockmethodid = DEFAULT_LOCKMETHOD; LockMethod lockMethodTable; - proc = MyProc; if (proc == NULL) return; --- 1970,1984 ---- * * Must have already acquired the masterLock. */ ! static void ! DumpLocks(PGPROC *proc) { SHM_QUEUE *procLocks; PROCLOCK *proclock; LOCK *lock; int lockmethodid = DEFAULT_LOCKMETHOD; LockMethod lockMethodTable; if (proc == NULL) return; *************** *** 1819,1821 **** --- 2053,2164 ---- } #endif /* LOCK_DEBUG */ + + /* + * LOCK resource manager's routines + */ + + /* + * Re-acquire all locks that belong to transactions that were prepared. Some of + * these transactions might have committed/aborted later, but we release locks + * belonging to those transactions later in LockReleaseAllForPrepared, when we + * know their final state. + * + * Because this function is run at db startup, re-acquiring the locks should + * never conflict with running transactions because there is none. All locks + * are acquired for the same dummy PGPROC structure, and therefore they should + * also never conflict with each other. + */ + void + lock_recover_record(TransactionId xid, void *recdata, uint32 len) + { + TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; + int lockmode; + bool success; + PGPROC *dummyProc = TwoPhaseGetDummyProc(); + + for (lockmode = 0; lockmode < MAX_LOCKMODES; lockmode++) + { + if (rec->holdMask & LOCKBIT_ON(lockmode)) + { + success = LockAcquireForProc(rec->locktag.locktag_lockmethodid, &rec->locktag, + xid, lockmode, true, dummyProc); + if (success) + elog(DEBUG3, "Lock re-acquired"); + else + ereport(WARNING, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("couldn't re-acquire lock"))); + } + } + } + + + /* + * Release locks held by a prepared transaction. + */ + void + LockReleaseAllForPrepared(TransactionId xid) + { + PGPROC *dummyProc = TwoPhaseGetDummyProc(); + SHM_QUEUE *procLocks = &(dummyProc->procLocks); + PROCLOCK *proclock; + PROCLOCK *nextHolder; + LWLockId masterLock; + LockMethod lockMethodTable; + int i, + numLockModes; + LOCK *lock; + + lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD]; + + numLockModes = lockMethodTable->numLockModes; + masterLock = lockMethodTable->masterLock; + + LWLockAcquire(masterLock, LW_EXCLUSIVE); + + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)); + + while (proclock) + { + bool wakeupNeeded = false; + + /* Get link first, since we may unlink/delete this proclock */ + nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, + offsetof(PROCLOCK, procLink)); + + lock = (LOCK *) MAKE_PTR(proclock->tag.lock); + + if (!TransactionIdEquals(proclock->tag.xid, xid)) + goto next_item; + + PROCLOCK_PRINT("LockReleaseAllForPrepared", proclock); + LOCK_PRINT("LockReleaseAllForPrepared", lock, 0); + + for (i = 1; i <= numLockModes; i++) + { + if (!(proclock->holdMask & LOCKBIT_ON(i))) + continue; + + wakeupNeeded |= UnGrantLock(lock, i, proclock, lockMethodTable); + } + + LOCK_PRINT("LockReleaseAllForPrepared: updated", lock, 0); + + PROCLOCK_PRINT("LockReleaseAllForPrepared: deleting", proclock); + + CleanUpLock(DEFAULT_LOCKMETHOD, lock, proclock, wakeupNeeded); + next_item: + proclock = nextHolder; + } + + LWLockRelease(masterLock); + + #ifdef LOCK_DEBUG + if (Trace_locks) + elog(LOG, "LockReleaseAllForPrepared done"); + #endif + + return; + } diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/storage/lmgr/Makefile 08twophase/src/backend/storage/lmgr/Makefile *** 00orig/src/backend/storage/lmgr/Makefile 2004-10-15 21:16:22.000000000 -0300 --- 08twophase/src/backend/storage/lmgr/Makefile 2005-06-08 19:23:59.000000000 -0400 *************** *** 16,21 **** --- 16,23 ---- all: SUBSYS.o + CFLAGS = -O0 -Wall -Wmissing-prototypes -Wpointer-arith -Wendif-labels -fno-strict-aliasing -g -I../../../../src/include -I/home/alvherre/CVS/pgsql/source/07twophase/src/include -D_GNU_SOURCE + SUBSYS.o: $(OBJS) $(LD) $(LDREL) $(LDOUT) SUBSYS.o $(OBJS) diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/storage/smgr/smgr.c 08twophase/src/backend/storage/smgr/smgr.c *** 00orig/src/backend/storage/smgr/smgr.c 2005-06-07 16:44:23.000000000 -0400 --- 08twophase/src/backend/storage/smgr/smgr.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 18,23 **** --- 18,24 ---- #include "postgres.h" #include "access/xact.h" + #include "access/twophase_rmgr.h" #include "commands/tablespace.h" #include "storage/bufmgr.h" #include "storage/freespace.h" *************** *** 677,682 **** --- 678,731 ---- reln->smgr_rnode.relNode))); } + void + smgr_postcommit_record(TransactionId xid, void *recdata, uint32 len) + { + PendingRelDelete *pending = (PendingRelDelete *) recdata; + + if (pending->atCommit) + smgr_internal_unlink(pending->relnode, + pending->which, + pending->isTemp, + false); + } + + void + smgr_postabort_record(TransactionId xid, void *recdata, uint32 len) + { + PendingRelDelete *pending = (PendingRelDelete *) recdata; + + if (!pending->atCommit) + smgr_internal_unlink(pending->relnode, + pending->which, + pending->isTemp, + false); + } + + /* + * AtPrepare_smgr -- Does things related to two-phase commit prepare phase + */ + void + AtPrepare_smgr(void) + { + if(pendingDeletes != NULL) { + PendingRelDelete *pending; + PendingRelDelete *next; + + for (pending = pendingDeletes; pending != NULL; pending = next) + { + next = pending->next; + + pendingDeletes = next; + + RegisterTwoPhaseRecord(TWOPHASE_RM_SMGR_ID, pending, sizeof(PendingRelDelete)); + /* must explicitly free the list entry */ + pfree(pending); + } + } + } + + /* * smgrDoPendingDeletes() -- Take care of relation deletes at end of xact. * diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/tcop/postgres.c 08twophase/src/backend/tcop/postgres.c *** 00orig/src/backend/tcop/postgres.c 2005-06-07 16:44:24.000000000 -0400 --- 08twophase/src/backend/tcop/postgres.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 928,933 **** --- 928,934 ---- if (stmt->kind == TRANS_STMT_COMMIT || stmt->kind == TRANS_STMT_ROLLBACK || + stmt->kind == TRANS_STMT_PREPARE || stmt->kind == TRANS_STMT_ROLLBACK_TO) allowit = true; } diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/tcop/utility.c 08twophase/src/backend/tcop/utility.c *** 00orig/src/backend/tcop/utility.c 2005-04-28 19:39:57.000000000 -0400 --- 08twophase/src/backend/tcop/utility.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 383,393 **** if (strcmp(item->defname, "transaction_isolation") == 0) SetPGVariable("transaction_isolation", list_make1(item->arg), ! false); else if (strcmp(item->defname, "transaction_read_only") == 0) SetPGVariable("transaction_read_only", list_make1(item->arg), ! false); } } break; --- 383,393 ---- if (strcmp(item->defname, "transaction_isolation") == 0) SetPGVariable("transaction_isolation", list_make1(item->arg), ! true); else if (strcmp(item->defname, "transaction_read_only") == 0) SetPGVariable("transaction_read_only", list_make1(item->arg), ! true); } } break; *************** *** 401,406 **** --- 401,425 ---- } break; + case TRANS_STMT_COMMIT_PREPARED: + PreventTransactionChain(stmt, "COMMIT PREPARED"); + CommitPreparedTransaction(stmt->gid); + break; + + case TRANS_STMT_ROLLBACK_PREPARED: + PreventTransactionChain(stmt, "ROLLBACK PREPARED"); + AbortPreparedTransaction(stmt->gid); + break; + + case TRANS_STMT_PREPARE: + if (!PrepareTransactionBlock(stmt->gid)) + { + /* report unsuccessful commit in completionTag */ + if (completionTag) + strcpy(completionTag, "ROLLBACK"); + } + break; + case TRANS_STMT_ROLLBACK: UserAbortTransactionBlock(); break; *************** *** 1198,1207 **** --- 1217,1238 ---- tag = "START TRANSACTION"; break; + case TRANS_STMT_PREPARE: + tag = "PREPARE TRANSACTION"; + break; + case TRANS_STMT_COMMIT: tag = "COMMIT"; break; + case TRANS_STMT_COMMIT_PREPARED: + tag = "COMMIT PREPARED"; + break; + + case TRANS_STMT_ROLLBACK_PREPARED: + tag = "ROLLBACK PREPARED"; + break; + case TRANS_STMT_ROLLBACK: case TRANS_STMT_ROLLBACK_TO: tag = "ROLLBACK"; diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/utils/adt/Makefile 08twophase/src/backend/utils/adt/Makefile *** 00orig/src/backend/utils/adt/Makefile 2004-04-14 16:45:56.000000000 -0400 --- 08twophase/src/backend/utils/adt/Makefile 2005-06-08 19:23:59.000000000 -0400 *************** *** 1,7 **** # # Makefile for utils/adt # ! # $PostgreSQL: pgsql-server/src/backend/utils/adt/Makefile,v 1.56 2003/11/29 19:51:57 pgsql Exp $ # subdir = src/backend/utils/adt --- 1,7 ---- # # Makefile for utils/adt # ! # $PostgreSQL: pgsql/src/backend/utils/adt/Makefile,v 1.57 2004/04/01 21:28:45 tgl Exp $ # subdir = src/backend/utils/adt diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/utils/cache/inval.c 08twophase/src/backend/utils/cache/inval.c *** 00orig/src/backend/utils/cache/inval.c 2005-04-20 17:25:43.000000000 -0400 --- 08twophase/src/backend/utils/cache/inval.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 97,102 **** --- 97,103 ---- #include "utils/relcache.h" #include "utils/syscache.h" + #include "access/twophase_rmgr.h" /* * To minimize palloc traffic, we keep pending requests in successively- *************** *** 171,176 **** --- 172,181 ---- static int cache_callback_count = 0; + static void PersistInvalidationMessage(SharedInvalidationMessage *msg); + + static char * + describeInvalidationMessage(SharedInvalidationMessage *msg); /* ---------------------------------------------------------------- * Invalidation list support functions *************** *** 426,431 **** --- 431,440 ---- LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg) { int i; + char *desc = describeInvalidationMessage(msg); + + elog(DEBUG3, "local process: %s", desc); + pfree(desc); if (msg->id >= 0) { *************** *** 629,634 **** --- 638,644 ---- void AtStart_Inval(void) { + elog(DEBUG3, "Inval starting transaction"); Assert(transInvalInfo == NULL); transInvalInfo = (TransInvalidationInfo *) MemoryContextAllocZero(TopTransactionContext, *************** *** 637,642 **** --- 647,678 ---- } /* + * AtPrepare_Inval + * Save the inval lists state at 2PC transaction prepare. + * + * We need to do two things here: first, save the lists so they can be + * recovered after a crash. Second, send the messages to ourselves, because + * we need to see the preparing transaction as if it were in progress in a + * hypotetical remote backend (i.e. we don't see its changes.) + */ + void + AtPrepare_Inval(void) + { + elog(DEBUG3, "Inval preparing transaction"); + AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs, + &transInvalInfo->CurrentCmdInvalidMsgs); + + ProcessInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs, + PersistInvalidationMessage); + + ProcessInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs, + LocalExecuteInvalidationMessage); + + /* Need not free anything explicitly */ + transInvalInfo = NULL; + } + + /* * AtSubStart_Inval * Initialize inval lists at start of a subtransaction. */ *************** *** 644,649 **** --- 680,686 ---- AtSubStart_Inval(void) { TransInvalidationInfo *myInfo; + elog(DEBUG3, "Inval starting subtransaction"); Assert(transInvalInfo != NULL); myInfo = (TransInvalidationInfo *) *************** *** 654,659 **** --- 691,767 ---- transInvalInfo = myInfo; } + static char * + describeInvalidationMessage(SharedInvalidationMessage *msg) + { + char *result = palloc0(1024); + + strcat(result, "message "); + switch (msg->id) + { + case SHAREDINVALRELCACHE_ID: + { + SharedInvalRelcacheMsg *relmsg = (SharedInvalRelcacheMsg *) msg; + char tmp[1024]; + sprintf(tmp, "RelCache: dbId/relId %u/%u", relmsg->dbId, relmsg->relId); + strcat(result, tmp); + break; + }; + case SHAREDINVALSMGR_ID: + { + SharedInvalSmgrMsg *smgrmsg = (SharedInvalSmgrMsg *) msg; + char tmp[1024]; + sprintf(tmp, "Smgr: RelFileNode %u/%u/%u", smgrmsg->rnode.spcNode, smgrmsg->rnode.dbNode, smgrmsg->rnode.relNode); + strcat(result, tmp); + break; + } + default: + { + SharedInvalCatcacheMsg *ccmsg = (SharedInvalCatcacheMsg *) msg; + char tmp[1024]; + sprintf(tmp, "CatCache: dbId: %u cacheId: %d iptr(%u/%d) hash: %x", ccmsg->dbId, ccmsg->id, + ItemPointerGetBlockNumber(&(ccmsg->tuplePtr)), + ItemPointerGetOffsetNumber(&(ccmsg->tuplePtr)), ccmsg->hashValue); + strcat(result, tmp); + break; + } + } + return result; + } + + /* + * PersistInvalidationMessage + * Write an invalidation message to the 2PC state file. + */ + static void + PersistInvalidationMessage(SharedInvalidationMessage *msg) + { + char *desc; + RegisterTwoPhaseRecord(TWOPHASE_RM_INVAL_ID, msg, sizeof(SharedInvalidationMessage)); + desc = describeInvalidationMessage(msg); + elog(DEBUG3, "persisting message: %s", desc); + pfree(desc); + } + + /* + * inval_postcommit_record + * Recover an invalidation message from the 2PC state file. + */ + void + inval_postcommit_record(TransactionId xid, void *recdata, uint32 len) + { + SharedInvalidationMessage *msg = (SharedInvalidationMessage *) recdata; + + SendSharedInvalidMessage(msg); + /* + * We need to do file invalidation only once here, because we are in + * recovery mode so there's no race condition. We use the locking path + * of RelationCacheInitFileInvalidate just for paranoia's sake. + */ + RelationCacheInitFileInvalidate(false); + } + + /* * AtEOXact_Inval * Process queued-up invalidation messages at end of main transaction. *************** *** 681,686 **** --- 789,795 ---- void AtEOXact_Inval(bool isCommit) { + elog(DEBUG3, "Inval finishing transaction (%s)", isCommit ? "commit" : "abort"); if (isCommit) { /* Must be at top of stack */ *************** *** 740,745 **** --- 849,856 ---- int my_level = GetCurrentTransactionNestLevel(); TransInvalidationInfo *myInfo = transInvalInfo; + elog(DEBUG3, "Inval finishing subtransaction (%s)", isCommit ? "commit" : "abort"); + if (isCommit) { /* Must be at non-top of stack */ diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/utils/init/flatfiles.c 08twophase/src/backend/utils/init/flatfiles.c *** 00orig/src/backend/utils/init/flatfiles.c 2005-06-07 16:44:28.000000000 -0400 --- 08twophase/src/backend/utils/init/flatfiles.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 46,51 **** --- 46,53 ---- #include "utils/flatfiles.h" #include "utils/resowner.h" #include "utils/syscache.h" + #include "access/twophase.h" + #include "access/twophase_rmgr.h" #define DATABASE_FLAT_FILE "pg_database" *************** *** 757,762 **** --- 759,798 ---- SendPostmasterSignal(PMSIGNAL_PASSWORD_CHANGE); } + #define BIT_USER 1 + #define BIT_GROUP 2 + #define BIT_DATABASE 4 + + /* + * This routine is called during transaction prepare. + * + * Record which files need to be refreshed if this transaction later + * commits. + */ + void + AtPrepare_UpdateFlatFiles(void) + { + bits8 bits = 0; + + /* We record information on which of the files need to be + * reloaded after commit */ + if(database_file_update_subid != InvalidSubTransactionId) { + database_file_update_subid = InvalidSubTransactionId; + bits |= BIT_DATABASE; + } + if(group_file_update_subid != InvalidSubTransactionId) { + group_file_update_subid = InvalidSubTransactionId; + bits |= BIT_GROUP; + } + if(user_file_update_subid != InvalidSubTransactionId) { + user_file_update_subid = InvalidSubTransactionId; + bits |= BIT_USER; + } + if(bits != 0) + RegisterTwoPhaseRecord(TWOPHASE_RM_FLATFILES_ID, &bits, sizeof(bits)); + } + + /* * AtEOSubXact_UpdateFlatFiles * *************** *** 831,833 **** --- 867,889 ---- return PointerGetDatum(NULL); } + + void + flatfile_postcommit_record(TransactionId xid, void *recdata, uint32 len) + { + bits8 bits; + + Assert(len == 1); + + bits = *((bits8 *) recdata); + + /* Pick up file updates to the current transaction running + * COMMIT PREPARED. + */ + if (bits & BIT_DATABASE) + database_file_update_needed(); + if (bits & BIT_GROUP) + group_file_update_needed(); + if (bits & BIT_USER) + user_file_update_needed(); + } diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/utils/misc/guc.c 08twophase/src/backend/utils/misc/guc.c *** 00orig/src/backend/utils/misc/guc.c 2005-06-07 16:44:32.000000000 -0400 --- 08twophase/src/backend/utils/misc/guc.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 25,30 **** --- 25,31 ---- #include "utils/guc.h" #include "utils/guc_tables.h" + #include "access/twophase.h" #include "catalog/namespace.h" #include "catalog/pg_type.h" #include "commands/async.h" *************** *** 1094,1099 **** --- 1095,1109 ---- }, { + {"max_prepared_transactions", PGC_POSTMASTER, RESOURCES, + gettext_noop("Sets the maximum number of simultaneously prepared transactions."), + NULL + }, + &max_prepared_xacts, + 50, 0, 1000, NULL, NULL + }, + + { {"max_files_per_process", PGC_POSTMASTER, RESOURCES_KERNEL, gettext_noop("Sets the maximum number of simultaneously open files for each server process."), NULL *************** *** 1929,1934 **** --- 1939,1946 ---- static bool guc_dirty; /* TRUE if need to do commit/abort work */ + static bool guc_cannotprepare; /* TRUE if this transaction cannot be prepared for 2PC */ + static bool reporting_enabled; /* TRUE to enable GUC_REPORT */ static char *guc_string_workspace; /* for avoiding memory leaks */ *************** *** 2456,2461 **** --- 2468,2474 ---- } guc_dirty = false; + guc_cannotprepare = false; reporting_enabled = false; *************** *** 2681,2686 **** --- 2694,2700 ---- conf->gen.tentative_source = conf->gen.reset_source; conf->gen.status |= GUC_HAVE_TENTATIVE; guc_dirty = true; + guc_cannotprepare = true; break; } case PGC_INT: *************** *** 2697,2702 **** --- 2711,2717 ---- conf->gen.tentative_source = conf->gen.reset_source; conf->gen.status |= GUC_HAVE_TENTATIVE; guc_dirty = true; + guc_cannotprepare = true; break; } case PGC_REAL: *************** *** 2713,2718 **** --- 2728,2734 ---- conf->gen.tentative_source = conf->gen.reset_source; conf->gen.status |= GUC_HAVE_TENTATIVE; guc_dirty = true; + guc_cannotprepare = true; break; } case PGC_STRING: *************** *** 2753,2758 **** --- 2769,2775 ---- conf->gen.tentative_source = conf->gen.reset_source; conf->gen.status |= GUC_HAVE_TENTATIVE; guc_dirty = true; + guc_cannotprepare = true; break; } } *************** *** 2845,2850 **** --- 2862,2877 ---- } /* + * Do GUC processing at transaction prepare. + */ + void + AtPrepare_GUC(void) + { + if (guc_cannotprepare) + elog(ERROR, "two-phase commit of session variables is not implemented"); + } + + /* * Do GUC processing at transaction or subtransaction commit or abort. */ void *************** *** 3104,3111 **** * know that all outer transaction levels will have stacked values to * deal with.) */ ! if (!isSubXact) guc_dirty = false; } --- 3131,3140 ---- * know that all outer transaction levels will have stacked values to * deal with.) */ ! if (!isSubXact) { guc_dirty = false; + guc_cannotprepare = false; + } } *************** *** 3512,3517 **** --- 3541,3547 ---- conf->gen.tentative_source = source; conf->gen.status |= GUC_HAVE_TENTATIVE; guc_dirty = true; + guc_cannotprepare = true; } } break; *************** *** 3800,3805 **** --- 3830,3836 ---- conf->gen.tentative_source = source; conf->gen.status |= GUC_HAVE_TENTATIVE; guc_dirty = true; + guc_cannotprepare = true; } } else diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/utils/misc/postgresql.conf.sample 08twophase/src/backend/utils/misc/postgresql.conf.sample *** 00orig/src/backend/utils/misc/postgresql.conf.sample 2005-06-07 16:44:32.000000000 -0400 --- 08twophase/src/backend/utils/misc/postgresql.conf.sample 2005-06-08 19:23:59.000000000 -0400 *************** *** 109,114 **** --- 109,116 ---- #bgwriter_all_percent = 0.333 # 0-100% of all buffers scanned in each round #bgwriter_all_maxpages = 5 # 0-1000 buffers max written per round + #max_prepared_transactions = 100 # 0-1000 + #--------------------------------------------------------------------------- # WRITE AHEAD LOG diff -Ncr --exclude-from=diff-ignore 00orig/src/backend/utils/mmgr/portalmem.c 08twophase/src/backend/utils/mmgr/portalmem.c *** 00orig/src/backend/utils/mmgr/portalmem.c 2005-06-01 01:04:11.000000000 -0400 --- 08twophase/src/backend/utils/mmgr/portalmem.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 406,411 **** --- 406,420 ---- } } + /* + * This is called at prepare phase of a two-phase commit. Currently, + * we don't do anything different from a regular commit. + */ + void + AtPrepare_Portals(void) + { + AtCommit_Portals(); + } /* * Pre-commit processing for portals. diff -Ncr --exclude-from=diff-ignore 00orig/src/bin/initdb/initdb.c 08twophase/src/bin/initdb/initdb.c *** 00orig/src/bin/initdb/initdb.c 2005-05-02 17:30:21.000000000 -0400 --- 08twophase/src/bin/initdb/initdb.c 2005-06-08 19:23:59.000000000 -0400 *************** *** 2124,2129 **** --- 2124,2130 ---- "pg_xlog/archive_status", "pg_clog", "pg_subtrans", + "pg_twophase", "pg_multixact/members", "pg_multixact/offsets", "base", diff -Ncr --exclude-from=diff-ignore 00orig/src/include/access/twophase.h 08twophase/src/include/access/twophase.h *** 00orig/src/include/access/twophase.h 1969-12-31 21:00:00.000000000 -0300 --- 08twophase/src/include/access/twophase.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 0 **** --- 1,54 ---- + /* + * twophase.h + * + * Two-phase commit related declarations. + * + * $PostgreSQL$ + */ + #ifndef TWOPHASE_H + #define TWOPHASE_H + + #include "access/xlog.h" + #include "access/twophase_rmgr.h" + #include "storage/lock.h" + + #define GIDSIZE 200 + + + /* + * GlobalTransactionData is defined in twophase.c; other places have no + * business knowing the internal definition. + */ + typedef struct GlobalTransactionData *GlobalTransaction; + + /* GUC variable */ + extern int max_prepared_xacts; + + extern PGPROC *TwoPhaseGetDummyProc(void); + + extern int GetPreparedTransactionsXidList(TransactionId **xids); + + extern int TwoPhaseShmemSize(void); + extern void TwoPhaseShmemInit(void); + extern void BootStrapTwoPhase(void); + + extern GlobalTransaction MarkAsPreparing(TransactionId xid, char *gid, + AclId owner); + extern void MarkAsPrepared(GlobalTransaction); + extern TransactionId MarkAsNoLongerPrepared(char *gid, AclId user); + + extern void StartPrepare(char *gid); + extern void EndPrepare(void); + + extern void RecoverPreparedTransactions(void); + + extern void RecreateTwoPhaseFile(TransactionId xid, void *content, int len); + extern void RemoveTwoPhaseFile(TransactionId xid); + + extern void CheckPointTwoPhase(void); + + extern bool TransactionIdIsPrepared(TransactionId xid); + + extern void FinishPreparedTransaction(char *gid, bool isCommit); + + #endif /* TWOPHASE_H */ diff -Ncr --exclude-from=diff-ignore 00orig/src/include/access/twophase_rmgr.h 08twophase/src/include/access/twophase_rmgr.h *** 00orig/src/include/access/twophase_rmgr.h 1969-12-31 21:00:00.000000000 -0300 --- 08twophase/src/include/access/twophase_rmgr.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 0 **** --- 1,32 ---- + /* + * twophase_rmgr.h + * + * Twophase resource managers definition + * + * $PostgreSQL$ + */ + #ifndef TWOPHASE_RMGR_H + #define TWOPHASE_RMGR_H + + typedef void (*TwoPhaseCallback) (TransactionId xid, void *recdata, uint32 len); + typedef uint8 TwoPhaseRmgrId; + + /* + * Built-in resource managers + */ + #define TWOPHASE_RM_END_ID 0 + #define TWOPHASE_RM_LOCK_ID 1 + #define TWOPHASE_RM_INVAL_ID 2 + #define TWOPHASE_RM_SMGR_ID 3 + #define TWOPHASE_RM_FLATFILES_ID 4 + #define TWOPHASE_RM_NOTIFY_ID 5 + #define TWOPHASE_RM_MAX_ID TWOPHASE_RM_NOTIFY_ID + + extern const TwoPhaseCallback recover_callbacks[]; + extern const TwoPhaseCallback postcommit_callbacks[]; + extern const TwoPhaseCallback postabort_callbacks[]; + + + extern void RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, void *data, uint32 len); + + #endif /* TWOPHASE_RMGR_H */ diff -Ncr --exclude-from=diff-ignore 00orig/src/include/access/xact.h 08twophase/src/include/access/xact.h *** 00orig/src/include/access/xact.h 2005-06-07 16:44:38.000000000 -0400 --- 08twophase/src/include/access/xact.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 47,52 **** --- 47,53 ---- typedef enum { XACT_EVENT_COMMIT, + XACT_EVENT_PREPARE, XACT_EVENT_ABORT } XactEvent; *************** *** 72,79 **** * XLOG allows to store some information in high 4 bits of log * record xl_info field */ ! #define XLOG_XACT_COMMIT 0x00 ! #define XLOG_XACT_ABORT 0x20 typedef struct xl_xact_commit { --- 73,83 ---- * XLOG allows to store some information in high 4 bits of log * record xl_info field */ ! #define XLOG_XACT_COMMIT 0x00 ! #define XLOG_XACT_PREPARE 0x10 ! #define XLOG_XACT_ABORT 0x20 ! #define XLOG_XACT_COMMIT_PREPARED 0x30 ! #define XLOG_XACT_ABORT_PREPARED 0x40 typedef struct xl_xact_commit { *************** *** 87,92 **** --- 91,101 ---- #define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes) + + /* high bits of xl_info field tells us if it's a prepare, + * abort or commit record + */ + typedef struct xl_xact_abort { time_t xtime; *************** *** 143,148 **** --- 152,161 ---- extern void RecordTransactionCommit(void); + extern void CommitPreparedTransaction(char *gid); + extern void AbortPreparedTransaction(char *gid); + extern bool PrepareTransactionBlock(char *gid); + extern int xactGetCommittedChildren(TransactionId **ptr); extern void xact_redo(XLogRecPtr lsn, XLogRecord *record); diff -Ncr --exclude-from=diff-ignore 00orig/src/include/access/xlog.h 08twophase/src/include/access/xlog.h *** 00orig/src/include/access/xlog.h 2005-06-08 19:16:50.000000000 -0400 --- 08twophase/src/include/access/xlog.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 150,155 **** --- 150,156 ---- #endif extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata); + extern XLogRecPtr XLogInsertOnBehalf(RmgrId rmid, uint8 info, XLogRecData *rdata, TransactionId xid); extern void XLogFlush(XLogRecPtr RecPtr); extern void xlog_redo(XLogRecPtr lsn, XLogRecord *record); diff -Ncr --exclude-from=diff-ignore 00orig/src/include/catalog/pg_proc.h 08twophase/src/include/catalog/pg_proc.h *** 00orig/src/include/catalog/pg_proc.h 2005-06-07 16:44:39.000000000 -0400 --- 08twophase/src/include/catalog/pg_proc.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 3001,3006 **** --- 3001,3008 ---- DESCR("SHOW ALL as a function"); DATA(insert OID = 1371 ( pg_lock_status PGNSP PGUID 12 f f t t v 0 2249 "" _null_ _null_ _null_ pg_lock_status - _null_ )); DESCR("view system lock information"); + DATA(insert OID = 1038 ( pg_prepared_xact PGNSP PGUID 12 f f f t v 0 2249 "" _null_ _null_ _null_ pg_prepared_xact - _null_ )); + DESCR("view two-phase transactions"); DATA(insert OID = 2079 ( pg_table_is_visible PGNSP PGUID 12 f f t f s 1 16 "26" _null_ _null_ _null_ pg_table_is_visible - _null_ )); DESCR("is table visible in search path?"); diff -Ncr --exclude-from=diff-ignore 00orig/src/include/commands/async.h 08twophase/src/include/commands/async.h *** 00orig/src/include/commands/async.h 2005-01-10 19:05:41.000000000 -0300 --- 08twophase/src/include/commands/async.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 26,31 **** --- 26,32 ---- extern void AtSubStart_Notify(void); extern void AtSubCommit_Notify(void); extern void AtSubAbort_Notify(void); + extern void AtPrepare_Notify(void); /* signal handler for inbound notifies (SIGUSR2) */ extern void NotifyInterruptHandler(SIGNAL_ARGS); *************** *** 38,41 **** --- 39,44 ---- extern void EnableNotifyInterrupt(void); extern bool DisableNotifyInterrupt(void); + extern void notify_postcommit_record(TransactionId xid, void *recdata, uint32 len); + #endif /* ASYNC_H */ diff -Ncr --exclude-from=diff-ignore 00orig/src/include/libpq/be-fsstubs.h 08twophase/src/include/libpq/be-fsstubs.h *** 00orig/src/include/libpq/be-fsstubs.h 2005-01-10 19:05:41.000000000 -0300 --- 08twophase/src/include/libpq/be-fsstubs.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 48,52 **** --- 48,53 ---- extern void AtEOXact_LargeObject(bool isCommit); extern void AtEOSubXact_LargeObject(bool isCommit, SubTransactionId mySubid, SubTransactionId parentSubid); + extern void AtPrepare_LargeObject(void); #endif /* BE_FSSTUBS_H */ diff -Ncr --exclude-from=diff-ignore 00orig/src/include/nodes/parsenodes.h 08twophase/src/include/nodes/parsenodes.h *** 00orig/src/include/nodes/parsenodes.h 2005-06-07 16:44:40.000000000 -0400 --- 08twophase/src/include/nodes/parsenodes.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 1556,1562 **** TRANS_STMT_ROLLBACK, TRANS_STMT_SAVEPOINT, TRANS_STMT_RELEASE, ! TRANS_STMT_ROLLBACK_TO } TransactionStmtKind; typedef struct TransactionStmt --- 1556,1565 ---- TRANS_STMT_ROLLBACK, TRANS_STMT_SAVEPOINT, TRANS_STMT_RELEASE, ! TRANS_STMT_ROLLBACK_TO, ! TRANS_STMT_PREPARE, ! TRANS_STMT_COMMIT_PREPARED, ! TRANS_STMT_ROLLBACK_PREPARED } TransactionStmtKind; typedef struct TransactionStmt *************** *** 1564,1569 **** --- 1567,1573 ---- NodeTag type; TransactionStmtKind kind; /* see above */ List *options; /* for BEGIN/START and savepoint commands */ + char *gid; /* for two-phase commit related commands*/ } TransactionStmt; /* ---------------------- diff -Ncr --exclude-from=diff-ignore 00orig/src/include/postgres.h 08twophase/src/include/postgres.h *** 00orig/src/include/postgres.h 2005-04-20 17:25:53.000000000 -0400 --- 08twophase/src/include/postgres.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 277,282 **** --- 277,296 ---- #define TransactionIdGetDatum(X) ((Datum) SET_4_BYTES((X))) /* + * DatumGetAclId + * Returns user or group identifier value of a datum. + */ + + #define DatumGetAclId(X) ((AclId) GET_4_BYTES(X)) + + /* + * AclIdGetDatum + * Returns datum representation for a user or group identifier. + */ + + #define AclIdGetDatum(X) ((AclId) SET_4_BYTES((X))) + + /* * DatumGetCommandId * Returns command identifier value of a datum. */ diff -Ncr --exclude-from=diff-ignore 00orig/src/include/storage/lock.h 08twophase/src/include/storage/lock.h *** 00orig/src/include/storage/lock.h 2005-06-01 01:04:21.000000000 -0400 --- 08twophase/src/include/storage/lock.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 371,376 **** --- 371,378 ---- extern bool LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, TransactionId xid, LOCKMODE lockmode); extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids); + extern bool LockPersistAll(void); + extern void LockReleaseAllForPrepared(TransactionId xid); extern void LockReleaseCurrentOwner(void); extern void LockReassignCurrentOwner(void); extern int LockCheckConflicts(LockMethod lockMethodTable, *************** *** 392,398 **** extern const char *GetLockmodeName(LOCKMODE mode); #ifdef LOCK_DEBUG ! extern void DumpLocks(void); extern void DumpAllLocks(void); #endif --- 394,400 ---- extern const char *GetLockmodeName(LOCKMODE mode); #ifdef LOCK_DEBUG ! extern void DumpLocks(PGPROC *proc); extern void DumpAllLocks(void); #endif diff -Ncr --exclude-from=diff-ignore 00orig/src/include/storage/lockrmgr.h 08twophase/src/include/storage/lockrmgr.h *** 00orig/src/include/storage/lockrmgr.h 1969-12-31 21:00:00.000000000 -0300 --- 08twophase/src/include/storage/lockrmgr.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 0 **** --- 1,18 ---- + /*------------------------------------------------------------------------- + * + * lockrmgr.h + * + * Lock subsystems two-phase state file hooks. + * + * $Id$ + * + *------------------------------------------------------------------------- + */ + #ifndef LOCKRMGR_H_ + #define LOCKRMGR_H_ + + #include "access/xlog.h" + + extern void lock_recover_record(TransactionId xid, void *data, uint32 len); + + #endif /* LOCKRMGR_H */ diff -Ncr --exclude-from=diff-ignore 00orig/src/include/storage/lwlock.h 08twophase/src/include/storage/lwlock.h *** 00orig/src/include/storage/lwlock.h 2005-05-19 18:47:20.000000000 -0400 --- 08twophase/src/include/storage/lwlock.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 46,51 **** --- 46,52 ---- MultiXactMemberControlLock, RelCacheInitLock, BgWriterCommLock, + TwoPhaseStateLock, NumFixedLWLocks, /* must be last except for * MaxDynamicLWLock */ diff -Ncr --exclude-from=diff-ignore 00orig/src/include/storage/procarray.h 08twophase/src/include/storage/procarray.h *** 00orig/src/include/storage/procarray.h 2005-05-19 17:35:47.000000000 -0400 --- 08twophase/src/include/storage/procarray.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 20,25 **** --- 20,26 ---- extern void ProcArrayRemoveMyself(void); extern bool TransactionIdIsInProgress(TransactionId xid); + extern bool TransactionIdIsPrepared(TransactionId transactionId); extern TransactionId GetOldestXmin(bool allDbs); /* Use "struct PGPROC", not PGPROC, to avoid including proc.h here */ diff -Ncr --exclude-from=diff-ignore 00orig/src/include/storage/smgr.h 08twophase/src/include/storage/smgr.h *** 00orig/src/include/storage/smgr.h 2005-06-07 16:44:43.000000000 -0400 --- 08twophase/src/include/storage/smgr.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 83,91 **** --- 83,95 ---- extern void smgrabort(void); extern void smgrsync(void); + extern void AtPrepare_smgr(void); + extern void smgr_redo(XLogRecPtr lsn, XLogRecord *record); extern void smgr_desc(char *buf, uint8 xl_info, char *rec); + extern void smgr_postcommit_record(TransactionId xid, void *recdata, uint32 len); + extern void smgr_postabort_record(TransactionId xid, void *recdata, uint32 len); /* internals: move me elsewhere -- ay 7/94 */ diff -Ncr --exclude-from=diff-ignore 00orig/src/include/utils/builtins.h 08twophase/src/include/utils/builtins.h *** 00orig/src/include/utils/builtins.h 2005-06-01 01:04:22.000000000 -0400 --- 08twophase/src/include/utils/builtins.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 825,830 **** --- 825,833 ---- /* lockfuncs.c */ extern Datum pg_lock_status(PG_FUNCTION_ARGS); + /* pg_prepared_xact.c */ + extern Datum pg_prepared_xact(PG_FUNCTION_ARGS); + /* catalog/pg_conversion.c */ extern Datum pg_convert_using(PG_FUNCTION_ARGS); diff -Ncr --exclude-from=diff-ignore 00orig/src/include/utils/flatfiles.h 08twophase/src/include/utils/flatfiles.h *** 00orig/src/include/utils/flatfiles.h 2005-05-11 18:09:54.000000000 -0400 --- 08twophase/src/include/utils/flatfiles.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 23,28 **** --- 23,29 ---- extern void BuildFlatFiles(bool database_only); + extern void AtPrepare_UpdateFlatFiles(void); extern void AtEOXact_UpdateFlatFiles(bool isCommit); extern void AtEOSubXact_UpdateFlatFiles(bool isCommit, SubTransactionId mySubid, *************** *** 30,33 **** --- 31,36 ---- extern Datum flatfile_update_trigger(PG_FUNCTION_ARGS); + extern void flatfile_postcommit_record(TransactionId xid, void *recdata, uint32 len); + #endif /* FLATFILES_H */ diff -Ncr --exclude-from=diff-ignore 00orig/src/include/utils/guc.h 08twophase/src/include/utils/guc.h *** 00orig/src/include/utils/guc.h 2005-03-25 18:19:35.000000000 -0400 --- 08twophase/src/include/utils/guc.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 187,192 **** --- 187,193 ---- extern bool SelectConfigFiles(const char *userDoption, const char *progname); extern void ResetAllOptions(void); extern void AtEOXact_GUC(bool isCommit, bool isSubXact); + extern void AtPrepare_GUC(void); extern void BeginReportingGUCOptions(void); extern void ParseLongOption(const char *string, char **name, char **value); extern bool set_config_option(const char *name, const char *value, diff -Ncr --exclude-from=diff-ignore 00orig/src/include/utils/inval.h 08twophase/src/include/utils/inval.h *** 00orig/src/include/utils/inval.h 2005-01-10 19:05:46.000000000 -0300 --- 08twophase/src/include/utils/inval.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 30,35 **** --- 30,37 ---- extern void AtEOSubXact_Inval(bool isCommit); + extern void AtPrepare_Inval(void); + extern void CommandEndInvalidationMessages(void); extern void CacheInvalidateHeapTuple(Relation relation, HeapTuple tuple); *************** *** 47,50 **** --- 49,53 ---- extern void CacheRegisterRelcacheCallback(CacheCallbackFunction func, Datum arg); + extern void inval_postcommit_record(TransactionId xid, void *recdata, uint32 len); #endif /* INVAL_H */ diff -Ncr --exclude-from=diff-ignore 00orig/src/include/utils/portal.h 08twophase/src/include/utils/portal.h *** 00orig/src/include/utils/portal.h 2005-04-20 17:25:59.000000000 -0400 --- 08twophase/src/include/utils/portal.h 2005-06-08 19:23:59.000000000 -0400 *************** *** 185,190 **** --- 185,191 ---- extern bool CommitHoldablePortals(void); extern void AtCommit_Portals(void); extern void AtAbort_Portals(void); + extern void AtPrepare_Portals(void); extern void AtCleanup_Portals(void); extern void AtSubCommit_Portals(SubTransactionId mySubid, SubTransactionId parentSubid, diff -Ncr --exclude-from=diff-ignore 00orig/src/test/regress/expected/prepared_xacts.out 08twophase/src/test/regress/expected/prepared_xacts.out *** 00orig/src/test/regress/expected/prepared_xacts.out 1969-12-31 21:00:00.000000000 -0300 --- 08twophase/src/test/regress/expected/prepared_xacts.out 2005-06-08 19:23:59.000000000 -0400 *************** *** 0 **** --- 1,102 ---- + -- + -- PREPARED_XACTS + -- + -- create a simple table that we'll use in the tests + CREATE TABLE p_xacts_test ( + foobar VARCHAR(10) + ); + INSERT INTO p_xacts_test VALUES ('aaa'); + -- Test PREPARE TRANSACTION + BEGIN; + UPDATE p_xacts_test SET foobar = 'bbb' WHERE foobar = 'aaa'; + SELECT * FROM p_xacts_test; + foobar + -------- + bbb + (1 row) + + PREPARE TRANSACTION 'foo1'; + SELECT * FROM p_xacts_test; + foobar + -------- + aaa + (1 row) + + -- Test pg_prepared_xacts system view + SELECT gid FROM pg_prepared_xacts; + gid + ------ + foo1 + (1 row) + + -- Test ROLLBACK PREPARED + ROLLBACK PREPARED 'foo1'; + SELECT * FROM p_xacts_test; + foobar + -------- + aaa + (1 row) + + SELECT gid FROM pg_prepared_xacts; + gid + ----- + (0 rows) + + -- Test COMMIT PREPARED + BEGIN; + INSERT INTO p_xacts_test VALUES ('ddd'); + SELECT * FROM p_xacts_test; + foobar + -------- + aaa + ddd + (2 rows) + + PREPARE TRANSACTION 'foo2'; + SELECT * FROM p_xacts_test; + foobar + -------- + aaa + (1 row) + + COMMIT PREPARED 'foo2'; + SELECT * FROM p_xacts_test; + foobar + -------- + aaa + ddd + (2 rows) + + -- Test duplicate gids + BEGIN; + UPDATE p_xacts_test SET foobar = 'eee' WHERE foobar = 'ddd'; + SELECT * FROM p_xacts_test; + foobar + -------- + aaa + eee + (2 rows) + + PREPARE TRANSACTION 'foo3'; + SELECT gid FROM pg_prepared_xacts; + gid + ------ + foo3 + (1 row) + + BEGIN; + INSERT INTO p_xacts_test VALUES ('fff'); + SELECT * FROM p_xacts_test; + foobar + -------- + aaa + ddd + fff + (3 rows) + + -- This should fail, because the gid foo3 is already in use + PREPARE TRANSACTION 'foo3'; + ERROR: global transaction identifier "foo3" is already in use + -- Clean up + ROLLBACK PREPARED 'foo3'; + DROP TABLE p_xacts_test; diff -Ncr --exclude-from=diff-ignore 00orig/src/test/regress/expected/rules.out 08twophase/src/test/regress/expected/rules.out *** 00orig/src/test/regress/expected/rules.out 2005-05-18 21:31:05.000000000 -0400 --- 08twophase/src/test/regress/expected/rules.out 2005-06-08 19:23:59.000000000 -0400 *************** *** 1316,1322 **** shoelace_obsolete | SELECT shoelace.sl_name, shoelace.sl_avail, shoelace.sl_color, shoelace.sl_len, shoelace.sl_unit, shoelace.sl_len_cm FROM shoelace WHERE (NOT (EXISTS (SELECT shoe.shoename FROM shoe WHERE (shoe.slcolor = shoelace.sl_color)))); street | SELECT r.name, r.thepath, c.cname FROM ONLY road r, real_city c WHERE (c.outline ## r.thepath); toyemp | SELECT emp.name, emp.age, emp."location", (12 * emp.salary) AS annualsal FROM emp; ! (40 rows) SELECT tablename, rulename, definition FROM pg_rules ORDER BY tablename, rulename; --- 1316,1322 ---- shoelace_obsolete | SELECT shoelace.sl_name, shoelace.sl_avail, shoelace.sl_color, shoelace.sl_len, shoelace.sl_unit, shoelace.sl_len_cm FROM shoelace WHERE (NOT (EXISTS (SELECT shoe.shoename FROM shoe WHERE (shoe.slcolor = shoelace.sl_color)))); street | SELECT r.name, r.thepath, c.cname FROM ONLY road r, real_city c WHERE (c.outline ## r.thepath); toyemp | SELECT emp.name, emp.age, emp."location", (12 * emp.salary) AS annualsal FROM emp; ! (41 rows) SELECT tablename, rulename, definition FROM pg_rules ORDER BY tablename, rulename; diff -Ncr --exclude-from=diff-ignore 00orig/src/test/regress/parallel_schedule 08twophase/src/test/regress/parallel_schedule *** 00orig/src/test/regress/parallel_schedule 2004-06-23 15:21:24.000000000 -0400 --- 08twophase/src/test/regress/parallel_schedule 2005-06-08 19:23:59.000000000 -0400 *************** *** 60,66 **** # ---------- # The fourth group of parallel test # ---------- ! test: select_into select_distinct select_distinct_on select_implicit select_having subselect union case join aggregates transactions random portals arrays btree_index hash_index update namespace test: privileges test: misc --- 60,66 ---- # ---------- # The fourth group of parallel test # ---------- ! test: select_into select_distinct select_distinct_on select_implicit select_having subselect union case join aggregates transactions random portals arrays btree_index hash_index update namespace prepared_xacts test: privileges test: misc diff -Ncr --exclude-from=diff-ignore 00orig/src/test/regress/sql/prepared_xacts.sql 08twophase/src/test/regress/sql/prepared_xacts.sql *** 00orig/src/test/regress/sql/prepared_xacts.sql 1969-12-31 21:00:00.000000000 -0300 --- 08twophase/src/test/regress/sql/prepared_xacts.sql 2005-06-08 20:06:25.000000000 -0400 *************** *** 0 **** --- 1,122 ---- + -- + -- PREPARED_XACTS + -- + + -- create a simple table that we'll use in the tests + CREATE TABLE p_xacts_test ( + foobar VARCHAR(10) + ); + + INSERT INTO p_xacts_test VALUES ('aaa'); + + + -- Test PREPARE TRANSACTION + BEGIN; + UPDATE p_xacts_test SET foobar = 'bbb' WHERE foobar = 'aaa'; + SELECT * FROM p_xacts_test; + PREPARE TRANSACTION 'foo1'; + + SELECT * FROM p_xacts_test; + + -- Test pg_prepared_xacts system view + SELECT gid FROM pg_prepared_xacts; + + -- Test ROLLBACK PREPARED + + ROLLBACK PREPARED 'foo1'; + + SELECT * FROM p_xacts_test; + + SELECT gid FROM pg_prepared_xacts; + + + -- Test COMMIT PREPARED + BEGIN; + INSERT INTO p_xacts_test VALUES ('ddd'); + SELECT * FROM p_xacts_test; + PREPARE TRANSACTION 'foo2'; + SELECT * FROM p_xacts_test; + + COMMIT PREPARED 'foo2'; + + SELECT * FROM p_xacts_test; + + -- Test duplicate gids + BEGIN; + UPDATE p_xacts_test SET foobar = 'eee' WHERE foobar = 'ddd'; + SELECT * FROM p_xacts_test; + PREPARE TRANSACTION 'foo3'; + + SELECT gid FROM pg_prepared_xacts; + + BEGIN; + INSERT INTO p_xacts_test VALUES ('fff'); + SELECT * FROM p_xacts_test; + + -- This should fail, because the gid foo3 is already in use + PREPARE TRANSACTION 'foo3'; + + -- Clean up + + ROLLBACK PREPARED 'foo3'; + + DROP TABLE p_xacts_test; + + -- Test subtransactions + BEGIN; + CREATE TABLE foo (a int); + INSERT INTO foo VALUES (1); + SAVEPOINT a; + INSERT INTO foo VALUES (2); + ROLLBACK TO a; + INSERT INTO foo VALUES (3); + PREPARE TRANSACTION 'regress-one'; + + CREATE TABLE bar(); + + -- Test shared invalidation + BEGIN; + -- This doesn't work currently. There's a problem in shared invalidation code. + DROP TABLE bar; + CREATE TABLE baz (a int); + INSERT INTO baz VALUES (1); + INSERT INTO baz VALUES (2); + DECLARE foo CURSOR FOR SELECT * FROM baz; + -- Fetch 1 tuple, keeping the cursor open + FETCH 1 FROM foo; + PREPARE TRANSACTION 'regress-two'; + + -- No such cursor + FETCH 1 FROM foo; + + -- Table doesn't exist, the creation hasn't been committed yet + SELECT * FROM foo; + + -- There should be two prepared transactions + SELECT * FROM pg_prepared_xacts; + + -- Disconnect, we will continue testing in a different backend + \c - + + -- There should be two prepared transactions + SELECT * FROM pg_prepared_xacts; + + -- Commit table creation + COMMIT PREPARED 'regress-one'; + \d foo + SELECT * FROM foo; + + -- There should be one prepared transaction + SELECT * FROM pg_prepared_xacts; + + SELECT * FROM bar; + COMMIT PREPARED 'regress-two'; + -- Error, table was dropped in prepared transaction "regress-two" + /* + * This test fails currently with an error relative to the relcache. Probably + * a problem with the sinval code. Must fix. + */ + SELECT * FROM bar; + + -- There should be no prepared transactions + SELECT * FROM pg_prepared_xacts;