Re: implementing asynchronous notifications
On Sun, 10 Apr 2005, Oliver Jowett wrote:
> It'll be tricky to do this from a single codebase as the NIO changes
> would need to reach right down to the underlying protocol stream.. it
> seems hard to localize the changes.
Okay, then I implemented this using Socket.getInputStream().available()
calls.
Proof of concept attached.
PGConnection.getNotifies() now processes peding Async Notifies first - no
need to send polling queries to the server anymore. The result is zero
network traffic in the idle case.
Any comments?
I am unclear as to how to handle possible protocol errors (e.g. when what
we end up reading from the connection is not an 'A'sync Notify).
Theoretically, in a working connection this should not happen though.
diff -urb postgresql-jdbc-8.0-310.src-vanilla/org/postgresql/core/v2/ProtocolConnectionImpl.java postgresql-jdbc-8.0-310.src/org/postgresql/core/v2/ProtocolConnectionImpl.java
--- postgresql-jdbc-8.0-310.src-vanilla/org/postgresql/core/v2/ProtocolConnectionImpl.java 2005-01-11 09:25:43.000000000 +0100
+++ postgresql-jdbc-8.0-310.src/org/postgresql/core/v2/ProtocolConnectionImpl.java 2005-04-11 05:07:42.000000000 +0200
@@ -58,6 +58,9 @@
public synchronized PGNotification[]
getNotifications() {
+ try {
+ executor.processNotifies();
+ } catch (SQLException e) {};
PGNotification[] array = (PGNotification[])notifications.toArray(new PGNotification[notifications.size()]);
notifications.clear();
return array;
diff -urb postgresql-jdbc-8.0-310.src-vanilla/org/postgresql/core/v2/QueryExecutorImpl.java postgresql-jdbc-8.0-310.src/org/postgresql/core/v2/QueryExecutorImpl.java
--- postgresql-jdbc-8.0-310.src-vanilla/org/postgresql/core/v2/QueryExecutorImpl.java 2005-02-01 08:27:54.000000000 +0100
+++ postgresql-jdbc-8.0-310.src/org/postgresql/core/v2/QueryExecutorImpl.java 2005-04-11 05:08:52.000000000 +0200
@@ -148,6 +148,25 @@
pgStream.flush();
}
+ public synchronized void processNotifies() throws SQLException {
+ try {
+ while (protoConnection.getTransactionState() == ProtocolConnection.TRANSACTION_IDLE && pgStream.getSocket().getInputStream().available()>0) {
+ int c = pgStream.ReceiveChar();
+ switch (c) {
+ case 'A': // Asynchronous Notify
+ receiveAsyncNotify();
+ break;
+ default:
+ throw new PSQLException(GT.tr("Unknown Response Type {0}.", new Character((char) c)), PSQLState.CONNECTION_FAILURE);
+ }
+ }
+ }
+ catch (IOException ioe)
+ {
+ throw new PSQLException(GT.tr("An I/O error occured while sending to the backend."), PSQLState.CONNECTION_FAILURE, ioe);
+ }
+ }
+
private byte[] receiveFastpathResult() throws IOException, SQLException {
SQLException error = null;
boolean endQuery = false;
diff -urb postgresql-jdbc-8.0-310.src-vanilla/org/postgresql/core/v3/ProtocolConnectionImpl.java postgresql-jdbc-8.0-310.src/org/postgresql/core/v3/ProtocolConnectionImpl.java
--- postgresql-jdbc-8.0-310.src-vanilla/org/postgresql/core/v3/ProtocolConnectionImpl.java 2005-01-11 09:25:44.000000000 +0100
+++ postgresql-jdbc-8.0-310.src/org/postgresql/core/v3/ProtocolConnectionImpl.java 2005-04-11 05:07:38.000000000 +0200
@@ -59,6 +59,9 @@
public synchronized PGNotification[]
getNotifications() {
+ try {
+ executor.processNotifies();
+ } catch (SQLException e) {};
PGNotification[] array = (PGNotification[])notifications.toArray(new PGNotification[notifications.size()]);
notifications.clear();
return array;
diff -urb postgresql-jdbc-8.0-310.src-vanilla/org/postgresql/core/v3/QueryExecutorImpl.java postgresql-jdbc-8.0-310.src/org/postgresql/core/v3/QueryExecutorImpl.java
--- postgresql-jdbc-8.0-310.src-vanilla/org/postgresql/core/v3/QueryExecutorImpl.java 2005-02-01 08:27:54.000000000 +0100
+++ postgresql-jdbc-8.0-310.src/org/postgresql/core/v3/QueryExecutorImpl.java 2005-04-11 05:09:12.000000000 +0200
@@ -524,6 +524,25 @@
pgStream.flush();
}
+ public synchronized void processNotifies() throws SQLException {
+ try {
+ while (protoConnection.getTransactionState() == ProtocolConnection.TRANSACTION_IDLE && pgStream.getSocket().getInputStream().available()>0) {
+ int c = pgStream.ReceiveChar();
+ switch (c) {
+ case 'A': // Asynchronous Notify
+ receiveAsyncNotify();
+ break;
+ default:
+ throw new PSQLException(GT.tr("Unknown Response Type {0}.", new Character((char) c)), PSQLState.CONNECTION_FAILURE);
+ }
+ }
+ }
+ catch (IOException ioe)
+ {
+ throw new PSQLException(GT.tr("An I/O error occured while sending to the backend."), PSQLState.CONNECTION_FAILURE, ioe);
+ }
+ }
+
private byte[] receiveFastpathResult() throws IOException, SQLException {
boolean endQuery = false;
SQLException error = null;
Home |
Main Index |
Thread Index