[PATCH 3/4] Add dblink functions for use with COPY ... TO FUNCTION ...

From: Daniel Farina <dfarina(at)truviso(dot)com>
To: pgsql-hackers(at)postgresql(dot)org
Cc: Daniel Farina <dfarina(at)truviso(dot)com>
Subject: [PATCH 3/4] Add dblink functions for use with COPY ... TO FUNCTION ...
Date: 2009-11-23 21:34:41
Message-ID: 1259012082-6196-4-git-send-email-dfarina@truviso.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

This patch enables dblink to be used for the purpose of efficient
bulk-loading via COPY and libpq in combination with the COPY TO
FUNCTION patch.

The following functions were added to accomplish this:

dblink_connection_reset: useful when handling errors and one just
wants to restore a connection to a known state, rolling back as many
transactions as necessary.

dblink_copy_end: completes the COPY

dblink_copy_open: puts a connection into the COPY state. Accepts
connection name, relation name, and binary mode flag.

dblink_copy_write: writes a row to the last connection put in the COPY
state by dblink_copy_open.

Generally speaking, code that uses this will look like the following
(presuming a named connection has already been made):

try:
SELECT dblink_copy_open('myconn', 'relation_name', true);
COPY bar TO FUNCTION dblink_copy_write;

-- since the dblink connection is still in the COPY state, one
-- can even copy some more data in multiple steps...
COPY bar_2 TO FUNCTION dblink_copy_write;

SELECT dblink_copy_end();
finally:
SELECT dblink_copy_reset('myconn');

Signed-off-by: Daniel Farina <dfarina(at)truviso(dot)com>
---
contrib/dblink/dblink.c | 190 +++++++++++++++++++++++++++++++++++
contrib/dblink/dblink.h | 5 +
contrib/dblink/dblink.sql.in | 20 ++++
contrib/dblink/uninstall_dblink.sql | 8 ++
4 files changed, 223 insertions(+), 0 deletions(-)

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 72fdf56..d32aeec 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -1722,6 +1722,196 @@ dblink_get_notify(PG_FUNCTION_ARGS)
* internal functions
*/

+/*
+ * Attempts to take the connection into a known state by rolling back
+ * transactions. If unable to restore the connection to a known idle state,
+ * raises an error.
+ */
+PG_FUNCTION_INFO_V1(dblink_connection_reset);
+Datum
+dblink_connection_reset(PG_FUNCTION_ARGS)
+{
+ PGresult *res = NULL;
+ PGconn *conn = NULL;
+ char *conname = NULL;
+ remoteConn *rconn = NULL;
+
+ bool triedonce = false;
+
+ DBLINK_INIT;
+
+ /* must be text */
+ Assert(PG_NARGS() == 1);
+ DBLINK_GET_NAMED_CONN;
+
+ if (!conn)
+ DBLINK_CONN_NOT_AVAIL;
+
+ while (!triedonce)
+ {
+ switch (PQtransactionStatus(conn))
+ {
+ case PQTRANS_IDLE:
+ /* Everything is okay */
+ goto finish;
+ case PQTRANS_ACTIVE:
+ case PQTRANS_INTRANS:
+ case PQTRANS_INERROR:
+ res = PQexec(conn, "ROLLBACK;");
+
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("%s: could not issue ROLLBACK command",
+ PG_FUNCNAME_MACRO)));
+
+ PQclear(res);
+ triedonce = true;
+ break;
+ case PQTRANS_UNKNOWN:
+ elog(ERROR, "%s: connection in unknown transaction state",
+ PG_FUNCNAME_MACRO);
+ }
+ }
+
+finish:
+ PG_RETURN_VOID();
+}
+
+/*
+ * dblink COPY support, procedures and variables
+ */
+static PGconn *dblink_copy_current = NULL;
+
+/*
+ * dblink_copy_open
+ *
+ * Start a COPY into a given relation on the named remote connection.
+ */
+PG_FUNCTION_INFO_V1(dblink_copy_open);
+Datum
+dblink_copy_open(PG_FUNCTION_ARGS)
+{
+ PGresult *res = NULL;
+ PGconn *conn = NULL;
+ char *conname = NULL;
+ remoteConn *rconn = NULL;
+
+ const char *copy_stmt = "COPY %s FROM STDIN%s;";
+ const char *with_binary = " WITH BINARY";
+ const char *quoted_remoted_relname;
+ bool isbinary;
+ int snprintf_retcode;
+
+ /*
+ * Should be large enough to contain any formatted output. Formed by
+ * counting the characters in the static formatting sections plus the
+ * bounded length of identifiers. Some modest padding was added for
+ * paranoia's sake, although all uses of this buffer are checked for
+ * over-length formats anyway.
+ */
+ char buf[64 + NAMEDATALEN];
+
+ DBLINK_INIT;
+
+ /* must be text,text,bool */
+ Assert(PG_NARGS() == 3);
+ DBLINK_GET_NAMED_CONN;
+
+ if (!conn)
+ DBLINK_CONN_NOT_AVAIL;
+
+ /* Read the procedure arguments into primitive values */
+ quoted_remoted_relname = NameListToQuotedString(
+ textToQualifiedNameList(PG_GETARG_TEXT_P(1)));
+ isbinary = PG_GETARG_BOOL(2);
+
+ /*
+ * Query parameterization only handles value-parameters -- of which
+ * identifiers are not considered one of -- so format the string the old
+ * fashioned way. It is very important to quote identifiers for this
+ * reason, as performed previously.
+ */
+ snprintf_retcode = snprintf(buf, sizeof buf, copy_stmt,
+ quoted_remoted_relname,
+ isbinary ? with_binary : "");
+
+ if (snprintf_retcode < 0)
+ elog(ERROR, "could not format dblink COPY query");
+ else if (snprintf_retcode >= sizeof buf)
+ /*
+ * Should not be able to happen, see documentation of the "buf" value
+ * in this procedure.
+ */
+ elog(ERROR, "could not fit formatted dblink COPY query into buffer");
+
+ /*
+ * Run the created query, and check to ensure that PGRES_COPY_IN state has
+ * been achieved.
+ */
+ res = PQexec(conn, buf);
+ if (!res || PQresultStatus(res) != PGRES_COPY_IN)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not start COPY FROM on remote node")));
+ PQclear(res);
+
+ /*
+ * Everything went well; finally bind the global dblink_copy_current to the
+ * connection ready to accept copy data.
+ */
+ dblink_copy_current = conn;
+ PG_RETURN_TEXT_P(cstring_to_text("OK"));
+}
+
+/*
+ * dblink_copy_write
+ *
+ * Write the provided StringInfo to the currently open COPY.
+ */
+PG_FUNCTION_INFO_V1(dblink_copy_write);
+Datum
+dblink_copy_write(PG_FUNCTION_ARGS)
+{
+ StringInfo copybuf = (void *) PG_GETARG_POINTER(0);
+
+ if (PQputCopyData(dblink_copy_current, copybuf->data, copybuf->len) != 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_EXCEPTION),
+ errmsg("could not send row to remote node")));
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * dblink_copy_end
+ *
+ * Finish the currently open COPY.
+ */
+PG_FUNCTION_INFO_V1(dblink_copy_end);
+Datum
+dblink_copy_end(PG_FUNCTION_ARGS)
+{
+ PGresult *res;
+
+ /* Check to ensure that termination data was sent successfully */
+ if (PQputCopyEnd(dblink_copy_current, NULL) != 1)
+ elog(ERROR, "COPY end failed");
+
+ do
+ {
+ res = PQgetResult(dblink_copy_current);
+ if (res == NULL)
+ break;
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ elog(ERROR, "COPY failed: %s",
+ PQerrorMessage(dblink_copy_current));
+ PQclear(res);
+ } while (true);
+
+ dblink_copy_current = NULL;
+ PG_RETURN_TEXT_P(cstring_to_text("OK"));
+}

/*
* get_pkey_attnames
diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h
index 255f5d0..8a2faee 100644
--- a/contrib/dblink/dblink.h
+++ b/contrib/dblink/dblink.h
@@ -59,4 +59,9 @@ extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
extern Datum dblink_current_query(PG_FUNCTION_ARGS);
extern Datum dblink_get_notify(PG_FUNCTION_ARGS);

+extern Datum dblink_connection_reset(PG_FUNCTION_ARGS);
+
+extern Datum dblink_copy_open(PG_FUNCTION_ARGS);
+extern Datum dblink_copy_write(PG_FUNCTION_ARGS);
+extern Datum dblink_copy_end(PG_FUNCTION_ARGS);
#endif /* DBLINK_H */
diff --git a/contrib/dblink/dblink.sql.in b/contrib/dblink/dblink.sql.in
index da5dd65..aedca34 100644
--- a/contrib/dblink/dblink.sql.in
+++ b/contrib/dblink/dblink.sql.in
@@ -221,3 +221,23 @@ CREATE OR REPLACE FUNCTION dblink_get_notify(
RETURNS setof record
AS 'MODULE_PATHNAME', 'dblink_get_notify'
LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_connection_reset (text)
+RETURNS void
+AS 'MODULE_PATHNAME','dblink_connection_reset'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_copy_open (text, text, boolean)
+RETURNS text
+AS 'MODULE_PATHNAME','dblink_copy_open'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_copy_write (internal)
+RETURNS void
+AS 'MODULE_PATHNAME','dblink_copy_write'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_copy_end ()
+RETURNS text
+AS 'MODULE_PATHNAME','dblink_copy_end'
+LANGUAGE C STRICT;
diff --git a/contrib/dblink/uninstall_dblink.sql b/contrib/dblink/uninstall_dblink.sql
index 45cf13c..465beb7 100644
--- a/contrib/dblink/uninstall_dblink.sql
+++ b/contrib/dblink/uninstall_dblink.sql
@@ -11,6 +11,14 @@ DROP FUNCTION dblink_build_sql_delete (text, int2vector, int4, _text);

DROP FUNCTION dblink_build_sql_insert (text, int2vector, int4, _text, _text);

+DROP FUNCTION dblink_copy_end ();
+
+DROP FUNCTION dblink_copy_open (text, text, boolean);
+
+DROP FUNCTION dblink_copy_write (internal);
+
+DROP FUNCTION dblink_connection_reset (text);
+
DROP FUNCTION dblink_get_pkey (text);

DROP TYPE dblink_pkey_results;
--
1.6.5.3

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Daniel Farina 2009-11-23 21:34:42 [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION
Previous Message Daniel Farina 2009-11-23 21:34:40 [PATCH 2/4] Add tests for "COPY ... TO FUNCTION ..."