patch: fix race conditions in Statement.cancel()
- From: Oliver Jowett <oliver(at)opencloud(dot)com>
- To: pgsql-jdbc(at)postgresql(dot)org
- Subject: patch: fix race conditions in Statement.cancel()
- Date: Tue, 16 Sep 2003 19:53:00 +1200
- Message-id: <20030916075258.GA11240@opencloud.com> <text/plain>
This patch (vs. CVS HEAD) fixes the race condition I reported earlier where
Statement.cancel() could cancel the wrong query, and some related cases
which would bite us if there were multiple Statements being executed
simultaneously and one was cancelled.
I've also moved the cancel() implementation from AbstractJdbc2Statement to
AbstractJdbc1Statement .. I couldn't see an obvious reason for the JDBC1
version to throw not-implemented while we had an implementation in JDBC2.
I needed to add a reasonable amount of infrastructure to do this correctly
while preserving the ability to have different Statements from the same
Connection executing simultaneously from separate threads. I've tried to
keep the overhead of this down; in the common case I think we've gone from
one uncontended monitor entry to two plus a couple of assignments. When
there's contention for the shared Connection, there's a now a wait-notify
loop and some Vector manipulation involved.
The approach I took was to move the mutual exclusion on the backend
connection from a synchronized(pgStream) {} block in QueryExecutor into an
explicit lock structure managed by AbstractJdbc1Connection itself. Queries
are explicitly queued when there is >1 query involved (this is where the
wait-notify loop is). There is a fastpath that gets used when there is no
outstanding query or cancellation request.
Cancellation requests now provide a statement context to the connection,
which handles working out whether the cancel is a no-op, a local-only cancel
(for queries that are queued waiting for the connection), or a real backend
cancel.
As suggested by Tom Lane, in the case of a real backend cancel being needed
we now wait for an EOF from the postmaster on the cancellation connection
before allowing further queries to start.
I've also added a couple of test cases that try to exercise this area ..
although it's hard to properly test for thread race conditions with a
black-box test.
Note that the changes to QueryExecutor are not as bad as it looks, most of
it is indentation changes caused by removing two synchronized blocks.
-O
Index: src/interfaces/jdbc/org/postgresql/errors.properties
===================================================================
RCS file: /projects/cvsroot/pgsql-server/src/interfaces/jdbc/org/postgresql/errors.properties,v
retrieving revision 1.24
diff -u -c -r1.24 errors.properties
*** src/interfaces/jdbc/org/postgresql/errors.properties 8 Sep 2003 17:30:22 -0000 1.24
--- src/interfaces/jdbc/org/postgresql/errors.properties 16 Sep 2003 07:45:41 -0000
***************
*** 110,112 ****
--- 110,114 ----
postgresql.format.badtime:The time given: {0} does not match the format required: {1}.
postgresql.format.badtimestamp:The timestamp given {0} does not match the format required: {1}.
postgresql.input.field.gt0:The maximum field size must be a value greater than or equal to 0.
+ postgresql.interrupted:The calling thread was interrupted while waiting for an operation to complete.
+ postgresql.cancelled:The query was cancelled while waiting for the connection to become available.
Index: src/interfaces/jdbc/org/postgresql/core/BaseConnection.java
===================================================================
RCS file: /projects/cvsroot/pgsql-server/src/interfaces/jdbc/org/postgresql/core/BaseConnection.java,v
retrieving revision 1.3
diff -u -c -r1.3 BaseConnection.java
*** src/interfaces/jdbc/org/postgresql/core/BaseConnection.java 29 May 2003 03:21:32 -0000 1.3
--- src/interfaces/jdbc/org/postgresql/core/BaseConnection.java 16 Sep 2003 07:45:42 -0000
***************
*** 23,29 ****
public void addNotification(PGNotification p_notification);
public void addWarning(String msg);
! public void cancelQuery() throws SQLException;
public Statement createStatement() throws SQLException;
public BaseResultSet execSQL(String s) throws SQLException;
public boolean getAutoCommit() throws SQLException;
--- 23,49 ----
public void addNotification(PGNotification p_notification);
public void addWarning(String msg);
!
! /**
! * Cancels a query in progress. The query is identified by a
! * <code>statementKey</code> that corresponds to the key passed
! * to {(at)link #beforeQuery(java.lang.Object)}.
! *<p>
! * If the query is not currently executing or waiting for execution,
! * this method is a no-op. If the query is waiting for the connection
! * to become free and has not started execution, it is immediately
! * cancelled. If the query is currently executing, the backend is asked
! * to cancel the query; this may or may not cause the query to terminate
! * with an error, depending on exact timing.
! *<p>
! * If the backend is asked to cancel a query, further query execution is
! * blocked until the cancellation completes to avoid a race condition
! * where the wrong query could be cancelled.
! *
! * @throws SQLException if the backend needed to be contacted to do a
! * query cancellation, but something went wrong during that process.
! */
! public void cancelQuery(Object statementKey) throws SQLException;
public Statement createStatement() throws SQLException;
public BaseResultSet execSQL(String s) throws SQLException;
public boolean getAutoCommit() throws SQLException;
***************
*** 43,47 ****
public void setAutoCommit(boolean autoCommit) throws SQLException;
public void setCursorName(String cursor) throws SQLException;
! }
--- 63,86 ----
public void setAutoCommit(boolean autoCommit) throws SQLException;
public void setCursorName(String cursor) throws SQLException;
! /**
! * Called by QueryExecutor before a query is sent to the backend.
! * Performs query ordering, cancelling, & mutual exclusion; only one
! * thread is allowed within a beforeQuery .. afterQuery block at once.
! *
! * @param statementKey the statement key to pass to cancelQuery() to cancel this query.
! * @throws SQLException if the query is cancelled or the calling thread is interrupted
! * while waiting for other queries on this conneciton to complete.
! */
! public void beforeQuery(Object statementKey) throws SQLException;
+ /**
+ * Called by QueryExecutor after a query has completed. Every successful execution
+ * of beforeQuery() should be followed by an execution of afterQuery().
+ *
+ * @param statementKey the statement key passed to the corresponding beforeQuery() call.
+ * @throws IllegalStateException if <code>statementKey</code> does not match the currently
+ * executing statement key.
+ */
+ public void afterQuery(Object statementKey) throws IllegalStateException;
+ }
Index: src/interfaces/jdbc/org/postgresql/core/PGStream.java
===================================================================
RCS file: /projects/cvsroot/pgsql-server/src/interfaces/jdbc/org/postgresql/core/PGStream.java,v
retrieving revision 1.3
diff -u -c -r1.3 PGStream.java
*** src/interfaces/jdbc/org/postgresql/core/PGStream.java 8 Sep 2003 17:30:22 -0000 1.3
--- src/interfaces/jdbc/org/postgresql/core/PGStream.java 16 Sep 2003 07:45:42 -0000
***************
*** 152,157 ****
--- 152,173 ----
}
/*
+ * Consume an expected EOF from the backend
+ * @exception SQLException if we get something other than an EOF
+ */
+ public void ReceiveEOF() throws SQLException
+ {
+ try {
+ int c = pg_input.read();
+ if (c < 0)
+ return;
+ throw new PSQLException("postgresql.stream.toomuch", PSQLState.COMMUNICATION_ERROR);
+ } catch (IOException e) {
+ throw new PSQLException("postgresql.stream.ioerror", PSQLState.COMMUNICATION_ERROR);
+ }
+ }
+
+ /*
* Receives a single character from the backend
*
* @return the character received
Index: src/interfaces/jdbc/org/postgresql/core/QueryExecutor.java
===================================================================
RCS file: /projects/cvsroot/pgsql-server/src/interfaces/jdbc/org/postgresql/core/QueryExecutor.java,v
retrieving revision 1.26
diff -u -c -r1.26 QueryExecutor.java
*** src/interfaces/jdbc/org/postgresql/core/QueryExecutor.java 13 Sep 2003 04:02:13 -0000 1.26
--- src/interfaces/jdbc/org/postgresql/core/QueryExecutor.java 16 Sep 2003 07:45:42 -0000
***************
*** 95,108 ****
*/
private BaseResultSet execute() throws SQLException
{
! if (connection.getPGProtocolVersionMajor() == 3) {
! if (Driver.logDebug)
! Driver.debug("Using Protocol Version3 to send query");
! return executeV3();
! } else {
! if (Driver.logDebug)
! Driver.debug("Using Protocol Version2 to send query");
! return executeV2();
}
}
--- 95,119 ----
*/
private BaseResultSet execute() throws SQLException
{
! if (pgStream == null)
! {
! throw new PSQLException("postgresql.con.closed", PSQLState.CONNECTION_DOES_NOT_EXIST);
! }
!
! connection.beforeQuery(statement); // Might throw.
!
! try {
! if (connection.getPGProtocolVersionMajor() == 3) {
! if (Driver.logDebug)
! Driver.debug("Using Protocol Version3 to send query");
! return executeV3();
! } else {
! if (Driver.logDebug)
! Driver.debug("Using Protocol Version2 to send query");
! return executeV2();
! }
! } finally {
! connection.afterQuery(statement);
}
}
***************
*** 111,216 ****
PSQLException error = null;
! if (pgStream == null)
! {
! throw new PSQLException("postgresql.con.closed", PSQLState.CONNECTION_DOES_NOT_EXIST);
! }
! synchronized (pgStream)
{
!
! sendQueryV3();
!
! int c;
! boolean l_endQuery = false;
! while (!l_endQuery)
{
! c = pgStream.ReceiveChar();
! switch (c)
! {
! case 'A': // Asynchronous Notify
! int pid = pgStream.ReceiveInteger(4);
! String msg = pgStream.ReceiveString(connection.getEncoding());
! connection.addNotification(new org.postgresql.core.Notification(msg, pid));
! break;
! case 'B': // Binary Data Transfer
! receiveTupleV3(true);
! break;
! case 'C': // Command Status
! receiveCommandStatusV3();
! break;
! case 'D': // Text Data Transfer
! receiveTupleV3(false);
! break;
! case 'E': // Error Message
!
! // it's possible to get more than one error message for a query
! // see libpq comments wrt backend closing a connection
! // so, append messages to a string buffer and keep processing
! // check at the bottom to see if we need to throw an exception
!
! int l_elen = pgStream.ReceiveIntegerR(4);
! String totalMessage = connection.getEncoding().decode(pgStream.Receive(l_elen-4));
! PSQLException l_error = PSQLException.parseServerError(totalMessage);
!
! if (error != null) {
! error.setNextException(l_error);
! } else {
! error = l_error;
! }
!
! // keep processing
! break;
! case 'I': // Empty Query
! int t = pgStream.ReceiveChar();
! break;
! case 'N': // Error Notification
! int l_nlen = pgStream.ReceiveIntegerR(4);
! statement.addWarning(connection.getEncoding().decode(pgStream.Receive(l_nlen-4)));
! break;
! case 'P': // Portal Name
! String pname = pgStream.ReceiveString(connection.getEncoding());
! break;
! case 'S':
! //TODO: handle parameter status messages
! int l_len = pgStream.ReceiveIntegerR(4);
! String l_pStatus = connection.getEncoding().decode(pgStream.Receive(l_len-4));
! if (Driver.logDebug)
! Driver.debug("ParameterStatus="+ l_pStatus);
! break;
! case 'T': // MetaData Field Description
! receiveFieldsV3();
! break;
! case 'Z':
! // read ReadyForQuery
! //TODO: use size better
! if (pgStream.ReceiveIntegerR(4) != 5) throw new PSQLException("postgresql.con.setup", PSQLState.CONNECTION_UNABLE_TO_CONNECT);
! //TODO: handle transaction status
! char l_tStatus = (char)pgStream.ReceiveChar();
! l_endQuery = true;
! break;
! default:
! throw new PSQLException("postgresql.con.type", PSQLState.CONNECTION_FAILURE, new Character((char) c));
}
!
! }
!
! // did we get an error during this query?
! if ( error != null )
! throw error;
!
! //if an existing result set was passed in reuse it, else
! //create a new one
! if (rs != null)
! {
! rs.reInit(fields, tuples, status, update_count, insert_oid, binaryCursor);
}
! else
{
rs = statement.createResultSet(fields, tuples, status, update_count, insert_oid, binaryCursor);
}
! return rs;
! }
}
private BaseResultSet executeV2() throws SQLException
--- 122,218 ----
PSQLException error = null;
! sendQueryV3();
! int c;
! boolean l_endQuery = false;
! while (!l_endQuery)
{
! c = pgStream.ReceiveChar();
! switch (c)
{
! case 'A': // Asynchronous Notify
! int pid = pgStream.ReceiveInteger(4);
! String msg = pgStream.ReceiveString(connection.getEncoding());
! connection.addNotification(new org.postgresql.core.Notification(msg, pid));
! break;
! case 'B': // Binary Data Transfer
! receiveTupleV3(true);
! break;
! case 'C': // Command Status
! receiveCommandStatusV3();
! break;
! case 'D': // Text Data Transfer
! receiveTupleV3(false);
! break;
! case 'E': // Error Message
!
! // it's possible to get more than one error message for a query
! // see libpq comments wrt backend closing a connection
! // so, append messages to a string buffer and keep processing
! // check at the bottom to see if we need to throw an exception
!
! int l_elen = pgStream.ReceiveIntegerR(4);
! String totalMessage = connection.getEncoding().decode(pgStream.Receive(l_elen-4));
! PSQLException l_error = PSQLException.parseServerError(totalMessage);
!
! if (error != null) {
! error.setNextException(l_error);
! } else {
! error = l_error;
}
!
! // keep processing
! break;
! case 'I': // Empty Query
! int t = pgStream.ReceiveChar();
! break;
! case 'N': // Error Notification
! int l_nlen = pgStream.ReceiveIntegerR(4);
! statement.addWarning(connection.getEncoding().decode(pgStream.Receive(l_nlen-4)));
! break;
! case 'P': // Portal Name
! String pname = pgStream.ReceiveString(connection.getEncoding());
! break;
! case 'S':
! //TODO: handle parameter status messages
! int l_len = pgStream.ReceiveIntegerR(4);
! String l_pStatus = connection.getEncoding().decode(pgStream.Receive(l_len-4));
! if (Driver.logDebug)
! Driver.debug("ParameterStatus="+ l_pStatus);
! break;
! case 'T': // MetaData Field Description
! receiveFieldsV3();
! break;
! case 'Z':
! // read ReadyForQuery
! //TODO: use size better
! if (pgStream.ReceiveIntegerR(4) != 5) throw new PSQLException("postgresql.con.setup", PSQLState.CONNECTION_UNABLE_TO_CONNECT);
! //TODO: handle transaction status
! char l_tStatus = (char)pgStream.ReceiveChar();
! l_endQuery = true;
! break;
! default:
! throw new PSQLException("postgresql.con.type", PSQLState.CONNECTION_FAILURE, new Character((char) c));
}
!
! }
!
! // did we get an error during this query?
! if ( error != null )
! throw error;
!
! //if an existing result set was passed in reuse it, else
! //create a new one
! if (rs != null)
! {
! rs.reInit(fields, tuples, status, update_count, insert_oid, binaryCursor);
! }
! else
{
rs = statement.createResultSet(fields, tuples, status, update_count, insert_oid, binaryCursor);
}
! return rs;
}
private BaseResultSet executeV2() throws SQLException
***************
*** 218,306 ****
StringBuffer errorMessage = null;
! if (pgStream == null)
! {
! throw new PSQLException("postgresql.con.closed", PSQLState.CONNECTION_DOES_NOT_EXIST);
! }
! synchronized (pgStream)
{
!
! sendQueryV2();
!
! int c;
! boolean l_endQuery = false;
! while (!l_endQuery)
! {
! c = pgStream.ReceiveChar();
!
! switch (c)
! {
! case 'A': // Asynchronous Notify
! int pid = pgStream.ReceiveInteger(4);
! String msg = pgStream.ReceiveString(connection.getEncoding());
! connection.addNotification(new org.postgresql.core.Notification(msg, pid));
! break;
! case 'B': // Binary Data Transfer
! receiveTupleV2(true);
! break;
! case 'C': // Command Status
! receiveCommandStatusV2();
! break;
! case 'D': // Text Data Transfer
! receiveTupleV2(false);
! break;
! case 'E': // Error Message
!
! // it's possible to get more than one error message for a query
! // see libpq comments wrt backend closing a connection
! // so, append messages to a string buffer and keep processing
! // check at the bottom to see if we need to throw an exception
!
! if ( errorMessage == null )
! errorMessage = new StringBuffer();
!
! errorMessage.append(pgStream.ReceiveString(connection.getEncoding()));
! // keep processing
! break;
! case 'I': // Empty Query
! int t = pgStream.ReceiveChar();
! break;
! case 'N': // Error Notification
! statement.addWarning(pgStream.ReceiveString(connection.getEncoding()));
! break;
! case 'P': // Portal Name
! String pname = pgStream.ReceiveString(connection.getEncoding());
! break;
! case 'T': // MetaData Field Description
! receiveFieldsV2();
! break;
! case 'Z':
! l_endQuery = true;
! break;
! default:
! throw new PSQLException("postgresql.con.type", PSQLState.CONNECTION_FAILURE, new Character((char) c));
! }
!
}
!
! // did we get an error during this query?
! if ( errorMessage != null )
! throw new SQLException( errorMessage.toString().trim() );
!
!
! //if an existing result set was passed in reuse it, else
! //create a new one
! if (rs != null)
! {
! rs.reInit(fields, tuples, status, update_count, insert_oid, binaryCursor);
! }
! else
! {
! rs = statement.createResultSet(fields, tuples, status, update_count, insert_oid, binaryCursor);
! }
! return rs;
}
}
/*
--- 220,299 ----
StringBuffer errorMessage = null;
! sendQueryV2();
! int c;
! boolean l_endQuery = false;
! while (!l_endQuery)
{
! c = pgStream.ReceiveChar();
!
! switch (c)
! {
! case 'A': // Asynchronous Notify
! int pid = pgStream.ReceiveInteger(4);
! String msg = pgStream.ReceiveString(connection.getEncoding());
! connection.addNotification(new org.postgresql.core.Notification(msg, pid));
! break;
! case 'B': // Binary Data Transfer
! receiveTupleV2(true);
! break;
! case 'C': // Command Status
! receiveCommandStatusV2();
! break;
! case 'D': // Text Data Transfer
! receiveTupleV2(false);
! break;
! case 'E': // Error Message
!
! // it's possible to get more than one error message for a query
! // see libpq comments wrt backend closing a connection
! // so, append messages to a string buffer and keep processing
! // check at the bottom to see if we need to throw an exception
!
! if ( errorMessage == null )
! errorMessage = new StringBuffer();
!
! errorMessage.append(pgStream.ReceiveString(connection.getEncoding()));
! // keep processing
! break;
! case 'I': // Empty Query
! int t = pgStream.ReceiveChar();
! break;
! case 'N': // Error Notification
! statement.addWarning(pgStream.ReceiveString(connection.getEncoding()));
! break;
! case 'P': // Portal Name
! String pname = pgStream.ReceiveString(connection.getEncoding());
! break;
! case 'T': // MetaData Field Description
! receiveFieldsV2();
! break;
! case 'Z':
! l_endQuery = true;
! break;
! default:
! throw new PSQLException("postgresql.con.type", PSQLState.CONNECTION_FAILURE, new Character((char) c));
}
!
! }
!
! // did we get an error during this query?
! if ( errorMessage != null )
! throw new SQLException( errorMessage.toString().trim() );
!
!
! //if an existing result set was passed in reuse it, else
! //create a new one
! if (rs != null)
! {
! rs.reInit(fields, tuples, status, update_count, insert_oid, binaryCursor);
! }
! else
! {
! rs = statement.createResultSet(fields, tuples, status, update_count, insert_oid, binaryCursor);
}
+ return rs;
}
/*
Index: src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Connection.java
===================================================================
RCS file: /projects/cvsroot/pgsql-server/src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Connection.java,v
retrieving revision 1.26
diff -u -c -r1.26 AbstractJdbc1Connection.java
*** src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Connection.java 13 Sep 2003 04:02:15 -0000 1.26
--- src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Connection.java 16 Sep 2003 07:45:43 -0000
***************
*** 104,109 ****
--- 104,113 ----
*/
private int isolationLevel = Connection.TRANSACTION_READ_COMMITTED;
+ // Query queue and cancellation infrastructure.
+ private Object currentQuery;
+ private final Vector queryQueue = new Vector(); // Better as a LinkedList, but that's not in 1.1.
+ private boolean cancelPending;
public abstract Statement createStatement() throws SQLException;
public abstract DatabaseMetaData getMetaData() throws SQLException;
***************
*** 1775,1786 ****
Types.TIMESTAMP, Types.TIMESTAMP, Types.TIMESTAMP
};
! public void cancelQuery() throws SQLException
{
org.postgresql.core.PGStream cancelStream = null;
try
{
cancelStream = new org.postgresql.core.PGStream(PG_HOST, PG_PORT);
}
catch (ConnectException cex)
{
--- 1779,1824 ----
Types.TIMESTAMP, Types.TIMESTAMP, Types.TIMESTAMP
};
! public void cancelQuery(Object statement) throws SQLException
{
+ synchronized (queryQueue) {
+ if (queryQueue.removeElement(statement)) {
+ // Query hadn't started executing yet. Remove it and wake up
+ // the thread that's waiting to start so it notices the
+ // cancellation.
+ queryQueue.notifyAll();
+ return;
+ }
+
+ if (currentQuery != statement) {
+ // Not executing this statement at all. Don't do anything.
+ return;
+ }
+
+ if (cancelPending) {
+ // We are already doing a cancel for the currently running statement.
+ // Don't do it twice.
+ return;
+ }
+
+ // Need to do a real cancel. Make sure new queries don't start while we do this.
+ cancelPending = true;
+ }
+
org.postgresql.core.PGStream cancelStream = null;
try
{
cancelStream = new org.postgresql.core.PGStream(PG_HOST, PG_PORT);
+
+ // Now we need to construct and send a cancel packet
+ cancelStream.SendInteger(16, 4);
+ cancelStream.SendInteger(80877102, 4);
+ cancelStream.SendInteger(pid, 4);
+ cancelStream.SendInteger(ckey, 4);
+ cancelStream.flush();
+
+ // Wait for the backend to close the connection to avoid races.
+ cancelStream.ReceiveEOF();
}
catch (ConnectException cex)
{
***************
*** 1793,1814 ****
{
throw new PSQLException ("postgresql.con.failed", PSQLState.CONNECTION_UNABLE_TO_CONNECT, e);
}
-
- // Now we need to construct and send a cancel packet
- try
- {
- cancelStream.SendInteger(16, 4);
- cancelStream.SendInteger(80877102, 4);
- cancelStream.SendInteger(pid, 4);
- cancelStream.SendInteger(ckey, 4);
- cancelStream.flush();
- }
- catch (IOException e)
- {
- throw new PSQLException("postgresql.con.failed", PSQLState.CONNECTION_UNABLE_TO_CONNECT, e);
- }
finally
{
try
{
if (cancelStream != null)
--- 1831,1844 ----
{
throw new PSQLException ("postgresql.con.failed", PSQLState.CONNECTION_UNABLE_TO_CONNECT, e);
}
finally
{
+ synchronized (queryQueue) {
+ // Cancellation is done, it's safe to start new queries again.
+ cancelPending = false;
+ queryQueue.notifyAll();
+ }
+
try
{
if (cancelStream != null)
***************
*** 1819,1824 ****
--- 1849,1913 ----
}
}
+ public void beforeQuery(Object statement) throws SQLException {
+ synchronized (queryQueue) {
+ // Fastpath for the common case where we have no contention.
+ if (!cancelPending && currentQuery == null) {
+ currentQuery = statement;
+ return;
+ }
+
+ // Ok, we have multiple queries and/or cancels involved, do the full logic.
+ queryQueue.addElement(statement);
+
+ // Wait for no currently running query and no currently running cancellation.
+ // Once both of these are true, we can start executing.
+ // If this statement is cancelled while still on the query queue,
+ // this is signalled by removing the statement from the queue.
+ while (true) {
+ if (!queryQueue.contains(statement)) {
+ // We were cancelled.
+ throw new PSQLException("postgresql.cancelled", PSQLState.SYSTEM_ERROR);
+ }
+
+ if (!cancelPending && currentQuery == null) {
+ // We can start.
+ queryQueue.removeElement(statement);
+ currentQuery = statement;
+ break;
+ }
+
+ // invariants here:
+ // queryQueue.contains(statement)
+ // currentQuery != statement
+ // currentQuery != null || cancelPending
+
+ try {
+ queryQueue.wait();
+ } catch (InterruptedException ie) {
+ // As it's hard for a caller to detect this case via
+ // the thrown exception, reset interruption state too.
+ Thread.currentThread().interrupt();
+ throw new PSQLException("postgresql.interrupted", PSQLState.SYSTEM_ERROR);
+ }
+ }
+
+ // invariants here:
+ // currentQuery == statement
+ // !cancelPending
+ // !queryQueue.contains(statement)
+ }
+ }
+
+ public void afterQuery(Object statement) {
+ synchronized (queryQueue) {
+ if (statement != currentQuery)
+ throw new IllegalStateException("bad beforeQuery()/afterQuery() ordering");
+
+ currentQuery = null;
+ queryQueue.notifyAll(); // Someone can wake up now.
+ }
+ }
//Methods to support postgres notifications
public void addNotification(org.postgresql.PGNotification p_notification)
Index: src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Statement.java
===================================================================
RCS file: /projects/cvsroot/pgsql-server/src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Statement.java,v
retrieving revision 1.36
diff -u -c -r1.36 AbstractJdbc1Statement.java
*** src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Statement.java 13 Sep 2003 04:02:15 -0000 1.36
--- src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Statement.java 16 Sep 2003 07:45:43 -0000
***************
*** 670,683 ****
/*
* Cancel can be used by one thread to cancel a statement that
* is being executed by another thread.
- * <p>
- * Not implemented, this method is a no-op.
*
* @exception SQLException only because thats the spec.
*/
public void cancel() throws SQLException
{
! throw new PSQLException("postgresql.unimplemented", PSQLState.NOT_IMPLEMENTED);
}
/*
--- 670,681 ----
/*
* Cancel can be used by one thread to cancel a statement that
* is being executed by another thread.
*
* @exception SQLException only because thats the spec.
*/
public void cancel() throws SQLException
{
! connection.cancelQuery(this);
}
/*
Index: src/interfaces/jdbc/org/postgresql/jdbc2/AbstractJdbc2Statement.java
===================================================================
RCS file: /projects/cvsroot/pgsql-server/src/interfaces/jdbc/org/postgresql/jdbc2/AbstractJdbc2Statement.java,v
retrieving revision 1.17
diff -u -c -r1.17 AbstractJdbc2Statement.java
*** src/interfaces/jdbc/org/postgresql/jdbc2/AbstractJdbc2Statement.java 9 Sep 2003 10:49:16 -0000 1.17
--- src/interfaces/jdbc/org/postgresql/jdbc2/AbstractJdbc2Statement.java 16 Sep 2003 07:45:43 -0000
***************
*** 118,128 ****
return result;
}
- public void cancel() throws SQLException
- {
- connection.cancelQuery();
- }
-
public Connection getConnection() throws SQLException
{
return (Connection) connection;
--- 118,123 ----
Index: src/interfaces/jdbc/org/postgresql/test/jdbc2/MiscTest.java
===================================================================
RCS file: /projects/cvsroot/pgsql-server/src/interfaces/jdbc/org/postgresql/test/jdbc2/MiscTest.java,v
retrieving revision 1.10
diff -u -c -r1.10 MiscTest.java
*** src/interfaces/jdbc/org/postgresql/test/jdbc2/MiscTest.java 29 May 2003 04:39:48 -0000 1.10
--- src/interfaces/jdbc/org/postgresql/test/jdbc2/MiscTest.java 16 Sep 2003 07:45:43 -0000
***************
*** 99,102 ****
--- 99,200 ----
fail( ex.getMessage() );
}
}
+
+ public void testCancel() throws Exception {
+ //
+ // Try to make sure there's no cancel() race where we'd cancel a not-yet-executed query.
+ //
+
+ Connection con = TestUtil.openDB();
+ Statement stmt = con.createStatement();
+ for (int i = 0; i < 10000; ++i) {
+ stmt.executeQuery("select version()").close();
+ stmt.cancel();
+ }
+ }
+
+ private class QueryRunnable implements Runnable {
+ QueryRunnable(Statement stmt, boolean ignoreErrors, int testValue) {
+ this.stmt = stmt;
+ this.ignoreErrors = ignoreErrors;
+ this.testValue = testValue;
+ }
+
+ public void run() {
+ while (!kill) {
+ try {
+ // Generate a reasonably sized resultset to try to slow the backend
+ // down a bit, so we have a chance of the cancel actually hitting us.
+ ResultSet rs = stmt.executeQuery("select " + testValue + " from pg_type t1, pg_type t2");
+ int count = 0;
+ while (rs.next()) {
+ assertEquals(testValue, rs.getInt(1));
+ ++count;
+ }
+ assertTrue(count > 0);
+ rs.close();
+ } catch (SQLException e) {
+ //e.printStackTrace();
+ if (!ignoreErrors) {
+ this.error = e;
+ break;
+ }
+ } catch (Throwable t) {
+ this.error = t;
+ break;
+ }
+ }
+
+ synchronized (this) {
+ stopped = true;
+ notifyAll();
+ }
+ }
+
+ public void stop() throws Exception {
+ kill = true;
+ synchronized (this) {
+ while (!stopped) {
+ try { this.wait(); }
+ catch (InterruptedException e) {}
+ }
+ }
+
+ if (error != null) {
+ if (error instanceof Exception)
+ throw (Exception)error;
+ else
+ throw (Error)error;
+ }
+ }
+
+ private final Statement stmt;
+ private final boolean ignoreErrors;
+ private final int testValue;
+
+ private Throwable error;
+ private volatile boolean kill = false;
+ private boolean stopped = false;
+ }
+
+ public void testThreadedCancel() throws Exception {
+ Connection con = TestUtil.openDB();
+ Statement stmt1 = con.createStatement();
+ QueryRunnable qr1 = new QueryRunnable(stmt1, true, 1); // ignore errors; we will repeatedly cancel this one
+ Statement stmt2 = con.createStatement();
+ QueryRunnable qr2 = new QueryRunnable(stmt2, false, 2); // report errors; it should never be cancelled.
+
+ new Thread(qr1).start();
+ new Thread(qr2).start();
+
+ Thread.sleep(1000); // Give them time to start.
+
+ for (int i = 0; i < 500; ++i) {
+ stmt1.cancel();
+ Thread.sleep(10);
+ }
+
+ qr1.stop();
+ qr2.stop();
+ }
}
Home |
Main Index |
Thread Index