package org.postgresql.copy; import java.io.*; import java.sql.*; import java.util.*; import org.postgresql.PGConnection; import org.postgresql.PG_Stream; import org.postgresql.core.*; import org.postgresql.util.*; /* * This class implements the copy protocol interface to org.postgresql. */ public class CopyManager { // This is the network stream associated with this connection public PGConnection pg_conn; public PG_Stream pg_stream; public CopyManager(org.postgresql.PGConnection pg_conn, org.postgresql.PG_Stream pg_stream) { this.pg_conn = pg_conn; this.pg_stream = pg_stream; } /* * Copies out a table in a flat stream, suitable for backups or high-volume loading. * * @param table the table from which it will read * @param out OutputStream to which the data will be written * @return void * @exception SQLException if a database access error occurs */ public void copyOut(String table, OutputStream out) throws SQLException { copyOutQuery("COPY " +table+ " TO STDOUT", out); } /* * Copies out a table in a flat stream, suitable for backups or high-volume loading. This method accepts any copy out query, including any options (delimiters/null) supported by the backend. * * @param query any "COPY ... TO STDOUT ..." query * @param out OutputStream to which the data will be written * @return void * @exception SQLException if a database access error occurs */ public void copyOutQuery(String query, OutputStream out) throws SQLException { synchronized (pg_stream) { try { // duplicates statements in QueryExecutor.sendQuery pg_stream.SendChar('Q'); pg_stream.Send(pg_conn.getEncoding().encode( query )); pg_stream.SendChar(0); pg_stream.flush(); // check response from backend for COPY OUT confirmation int response = pg_stream.ReceiveChar(); if ( response == 'E') { String error_string = pg_stream.ReceiveString(pg_conn.getEncoding()); throw new PSQLException( "postgresql.copy.protocolerror", error_string ); } else if (response != 'H') { throw new PSQLException( "postgresql.copy.protocolsurprise", "H", new Character((char)response) ); } long start_time = System.currentTimeMillis(); int i, j, k; // FIXME: do we need to acommodate other encodings?? while (true) { i = pg_stream.ReceiveChar(); if (i == '\\') { j = pg_stream.ReceiveChar(); k = pg_stream.ReceiveChar(); if (j == '.' && k == '\n') break; pg_stream.unread(k); pg_stream.unread(j); } out.write(i); } // the backend should send the strings "C" + "COPY" String response_str = pg_stream.ReceiveString(pg_conn.getEncoding()); if ( ! response_str.equals("CCOPY") ) { throw new PSQLException( "postgresql.copy.protocolsurprise", "CCOPY", response_str); } // check to make sure the backend is ready for the next query response = pg_stream.ReceiveChar(); if ( response == 'E') { String error_string = pg_stream.ReceiveString(pg_conn.getEncoding()); throw new PSQLException( "postgresql.copy.protocolerror", error_string ); } else if (response != 'Z') { throw new PSQLException( "postgresql.copy.protocolsurprise", "Z", new Character((char)response) ); } } catch (IOException ex) { throw new PSQLException( "postgresql.copy.ioerror", ex.toString()); } } return ; } /* * Copies into a table from a flat stream, suitable for restorations or high-volume loading. * * @param table the table into which the data will be written * @param in InputStream from which the data will be read * @return void * @exception SQLException if a database access error occurs */ public void copyIn(String table, InputStream in) throws SQLException { copyInQuery("COPY " +table+ " FROM STDIN", in); } /* * Copies into a table in a flat stream, suitable for backups or high-volume loading. This method accepts any copy in query, including any options (delimiters/null) supported by the backend. * * @param query any "COPY ... FROM STDIN ..." query * @param in InputStream from which the data will be read * @return void * @exception SQLException if a database access error occurs */ public void copyInQuery(String query, InputStream input) throws SQLException { PushbackInputStream in = new PushbackInputStream(input, 2); synchronized (pg_stream) { try { // inform the backend that we are about to COPY in some data // duplicates statements in QueryExecutor.sendQuery pg_stream.SendChar('Q'); pg_stream.Send(pg_conn.getEncoding().encode(query)); pg_stream.SendChar(0); pg_stream.flush(); // check response from backend int response = pg_stream.ReceiveChar(); if (response == 'E') { String error_string = pg_stream.ReceiveString(pg_conn.getEncoding()); throw new PSQLException( "postgresql.copy.protocolerror", error_string ); } else if (response != 'G') { throw new PSQLException( "postgresql.copy.protocolsurprise", "G", new Character((char)response) ); } // send the whole input stream int b, j, k; // a byte placeholder to read from in and send to backend while (true) { b = in.read(); if (b == -1) break; // the InputStream is finished // check to make sure we don't send the COPY termination sequence // perhaps this should be optional, though doesn't seem to add much overhead if (b == '\\') { j = in.read(); k = in.read(); if (j == -1 || k == -1 || (j == '.' && k == '\n')) break; in.unread(k); in.unread(j); } // FIXME: do we need to worry about encoding??? pg_stream.SendChar((char) b); } pg_stream.Send(new byte[]{(byte) '\\', (byte) '.', (byte) '\n'}); pg_stream.flush(); // the backend should send the strings "C" + "COPY" String response_str = pg_stream.ReceiveString(pg_conn.getEncoding()); if ( ! response_str.equals("CCOPY") ) { throw new PSQLException( "postgresql.copy.protocolsurprise", "CCOPY", response_str); } // check to make sure the backend is ready for the next query response = pg_stream.ReceiveChar(); if (response == 'E') { String error_string = pg_stream.ReceiveString(pg_conn.getEncoding()); throw new PSQLException( "postgresql.copy.protocolerror", error_string ); } else if (response != 'Z') { throw new PSQLException( "postgresql.copy.protocolsurprise", "Z", new Character((char)response) ); } } catch (IOException ex) { throw new PSQLException( "postgresql.copy.ioerror", ex.toString()); } } } // these two methods serve as examples and provide some of the functionality of pg_dump /* * Copies data from a table into a file. * * @param table the table from which the data will be read * @param filename the name of the file into which the data will be written * @return void */ public void copyToFile(String table, String filename) throws Exception { FileOutputStream fos = new FileOutputStream(filename); copyOut(table, fos); } /* * Copies data from a file into a table. * * @param table the table into which the data will be written * @param filename the name of the file from which the data will be read * @return void */ public void copyFromFile(String table, String filename) throws Exception { FileInputStream fis = new FileInputStream(filename); copyInQuery(table, fis); } }