package org.ecoinformatics.datamanager.database; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Vector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.ecoinformatics.datamanager.DataManager; import org.ecoinformatics.datamanager.download.DataSourceNotFoundException; import org.ecoinformatics.datamanager.download.DataStorageInterface; import org.ecoinformatics.datamanager.parser.AttributeList; import org.ecoinformatics.datamanager.parser.Entity; import org.ecoinformatics.datamanager.quality.QualityCheck; import org.ecoinformatics.datamanager.quality.QualityReport; import org.ecoinformatics.datamanager.quality.QualityCheck.Status; /** * This class implments the DataStorageInterface to load data into db. * In this class, PipedOutputStream and PipedInputStream are used in order * to eliminate the need to use temporary files on disk. * * @author Jing Tao * */ public class DatabaseLoader implements DataStorageInterface, Runnable { /* * Class fields */ public static Log log = LogFactory.getLog(DatabaseLoader.class); private static TableMonitor tableMonitor = null; /* * Instance fields */ private PipedInputStream inputStream = null; private PipedOutputStream outputStream = null; private Entity entity = null; private DatabaseAdapter databaseAdapter = null; private String errorCode = null; private boolean completed = false; private boolean success = false; private Exception exception = null; /* * Constructors */ /** * Constructor of this class. In constructor, it will create a pair of * PipedOutputStream object and PipedInputStream object. * * @param dbConnection Connection to database * @param dbAdapterName Name of database adapter * @param entity Metadata information associated with the loader * @throws IOException */ public DatabaseLoader(String dbAdapterName, Entity entity) throws IOException, SQLException { outputStream = new PipedOutputStream(); inputStream = new PipedInputStream(); outputStream.connect(inputStream); this.entity = entity; /* Initialize the databaseAdapter and tableMonitor fields */ if (dbAdapterName.equals(DatabaseAdapter.POSTGRES_ADAPTER)) { this.databaseAdapter = new PostgresAdapter(); } else if (dbAdapterName.equals(DatabaseAdapter.HSQL_ADAPTER)) { this.databaseAdapter = new HSQLAdapter(); } else if (dbAdapterName.equals(DatabaseAdapter.ORACLE_ADAPTER)) { this.databaseAdapter = new OracleAdapter(); } tableMonitor = new TableMonitor(databaseAdapter); } /** * Accesses the data for a given identifier, opening an input stream on it for * loading. This method is required for implementing DataStorageInterface. * * @param identifier An identifier of the data to be loaded. * @return an input stream * @throws DataSourceNotFoundException * Indicates that the data was not found in the local store. */ public InputStream load(String identifier) throws DataSourceNotFoundException { InputStream inputStream = null; return inputStream; } /** * Start to serialize a remote inputstream. The OutputStream is the * destination in the local store. The DownloadHandler reads data from the * remote source and writes it to the output stream for local storage. For the * DatabaseHandler class, the database itself serves as the local store. We * used Piped input stream and output stream to read and write data from * remote source. So the process will be run in another thread. * * @param identifier An identifier to the data in the local store that is * to be serialized. * @return An output stream to the location in the local store where the data * is to be serialized. */ public OutputStream startSerialize(String identifier) { log.debug("DatabaseLoader.startSerialize()"); completed = false; success = false; Thread newThread = new Thread(this); newThread.start(); return outputStream; } /** * Finishes serialization of the data to the local store. Close the input * stream and output stream. * * This method is required for implementing DataStorageInterface. * * @param identifier the identifier for the data whose serialization is done * @param errorCode a string indicating whether there was an error during * the serialization */ public void finishSerialize(String identifier, String errorCode) { this.errorCode = errorCode; if (inputStream != null) { try { //TODO: broken pipe error with this inputStream.close(); } catch (Exception e) { log.error( "Could not close inputStream in DatabaseLoader.finishSerialize(): " + e.getMessage()); } } if (outputStream != null) { try { outputStream.close(); } catch (Exception e) { log.error( "Could not close outputStream in DatabaseLoader.finishSerialize(): " + e.getMessage()); } } log.debug("DatabaseLoader.finishSerialize()"); } /** * Gets PipedInputStream object. * * @return PipedInputStream */ public PipedInputStream getPipedInputStream() { return this.inputStream; } /** * Gets PipedOutputStream object. * * @return PipedOutputStream object */ public PipedOutputStream getPipedOutputStream() { return this.outputStream; } /** * Gets the errorCode that DownloadHandler passed. * * @return the errorCode string */ public String getErrorCode() { return this.errorCode; } /** * Reads the data from PipedInputStream which connects the PipedOutputStream * which was returned to DownloadHandler. The read data will be loaded into * db. This is the real procedure to load data into db. */ public void run() { DelimitedReader delimitedReader = null; QualityCheck dataLoadQualityCheck = null; String insertSQL = ""; Vector rowVector = new Vector(); int rowCount = 0; if (entity == null) { success = false; completed = true; return; } else { // Initialize the dataLoadQualityCheck String dataLoadIdentifier = "dataLoadStatus"; QualityCheck dataLoadTemplate = QualityReport.getQualityCheckTemplate(dataLoadIdentifier); dataLoadQualityCheck = new QualityCheck(dataLoadIdentifier, dataLoadTemplate); } AttributeList attributeList = entity.getAttributeList(); String tableName = entity.getDBTableName(); TextDataReader dataReader = null; boolean stripHeaderLine = true; if (inputStream != null) { try { if (entity.isSimpleDelimited()) { delimitedReader = new DelimitedReader( inputStream, entity.getAttributes().length, entity.getFieldDelimiter(), entity.getNumHeaderLines(), entity.getRecordDelimiter(), entity.getNumRecords(), stripHeaderLine ); delimitedReader.setEntity(entity); delimitedReader.setCollapseDelimiters(entity.getCollapseDelimiters()); delimitedReader.setNumFooterLines(entity.getNumFooterLines()); if (entity.getQuoteCharacter() != null) { delimitedReader.setQuoteCharacter(entity.getQuoteCharacter()); } if (entity.getLiteralCharacter() != null) { delimitedReader.setLiteralCharacter(entity.getLiteralCharacter()); } dataReader = delimitedReader; } else { dataReader = new TextComplexFormatDataReader(inputStream, entity, stripHeaderLine ); } rowVector = dataReader.getOneRowDataVector(); } catch (Exception e) { log.error("Exception in DatabaseLoader.run(): " + e.getMessage()); if (QualityCheck.shouldRunQualityCheck(entity, dataLoadQualityCheck)) { // Report data load status as failed dataLoadQualityCheck.setFailedStatus(); dataLoadQualityCheck.setFound( "One or more errors occurred during data loading"); String explanation = ""; dataLoadQualityCheck.setExplanation(explanation); entity.addQualityCheck(dataLoadQualityCheck); } success = false; completed = true; exception = e; return; } Connection connection = null; try { /* * Display the first row of data in a QualityCheck object */ String displayRowIdentifier = "displayFirstInsertRow"; QualityCheck displayRowTemplate = QualityReport.getQualityCheckTemplate(displayRowIdentifier); QualityCheck displayRowQualityCheck = new QualityCheck(displayRowIdentifier, displayRowTemplate); if (QualityCheck.shouldRunQualityCheck(entity, displayRowQualityCheck)) { // Note that rowVector starts and ends with square brackets. We're // using a shortcut by incorporating them into the CDATA tags String foundString = ""; displayRowQualityCheck.setFound(foundString); displayRowQualityCheck.setStatus(Status.info); entity.addQualityCheck(displayRowQualityCheck); } connection = DataManager.getConnection(); if (connection == null) { success = false; exception = new Exception("The connection to db is null"); completed = true; return; } connection.setAutoCommit(false); while (!rowVector.isEmpty()) { insertSQL = databaseAdapter.generateInsertSQL(attributeList, tableName, rowVector); if (insertSQL != null) { PreparedStatement statement = connection.prepareStatement(insertSQL); statement.execute(); statement.close(); rowCount++; } rowVector = dataReader.getOneRowDataVector(); } connection.commit(); if (delimitedReader != null) { /* * If no 'tooFewFields' errors were detected, record the * quality check status as 'valid' */ String tooFewFieldsIdentifier = "tooFewFields"; QualityCheck tooFewFieldsTemplate = QualityReport.getQualityCheckTemplate(tooFewFieldsIdentifier); QualityCheck tooFewFieldsCheck = new QualityCheck(tooFewFieldsIdentifier, tooFewFieldsTemplate); if (QualityCheck.shouldRunQualityCheck(entity, tooFewFieldsCheck)) { if (delimitedReader.getTooFewFieldsCounter() == 0) { tooFewFieldsCheck.setExplanation(""); tooFewFieldsCheck.setFound("No errors detected"); tooFewFieldsCheck.setStatus(Status.valid); tooFewFieldsCheck.setSuggestion(""); entity.addQualityCheck(tooFewFieldsCheck); } } /* * If no 'tooManyFields' errors were detected, record the * quality check status as 'valid' */ String tooManyFieldsIdentifier = "tooManyFields"; QualityCheck tooManyFieldsTemplate = QualityReport.getQualityCheckTemplate(tooManyFieldsIdentifier); QualityCheck tooManyFieldsCheck = new QualityCheck(tooManyFieldsIdentifier, tooManyFieldsTemplate); if (QualityCheck.shouldRunQualityCheck(entity, tooManyFieldsCheck)) { if (delimitedReader.getTooManyFieldsCounter() == 0) { tooManyFieldsCheck.setExplanation(""); tooManyFieldsCheck.setFound("No errors detected"); tooManyFieldsCheck.setStatus(Status.valid); tooManyFieldsCheck.setSuggestion(""); entity.addQualityCheck(tooManyFieldsCheck); } } /* * If no delimiter string was found in the data, record the * an examineRecordDelimiter quality check as 'error' */ String examineRecordDelimiterIdentifier = "examineRecordDelimiter"; QualityCheck examineRecordDelimiterTemplate = QualityReport.getQualityCheckTemplate(examineRecordDelimiterIdentifier); QualityCheck examineRecordDelimiter = new QualityCheck(examineRecordDelimiterIdentifier, examineRecordDelimiterTemplate); if (QualityCheck.shouldRunQualityCheck(entity, examineRecordDelimiter)) { if (!delimitedReader.hasRecordDelimiter()) { String explanation = "No record delimiter was found in the data entity."; if (delimitedReader.exceedsRecordLengthLimit()) { int recordLengthLimit = delimitedReader.getRecordLengthLimit(); explanation = String.format("The first %d characters of the data entity were examined and no record delimiter was found matching the record delimiter specified in the metadata.", recordLengthLimit); } examineRecordDelimiter.setExplanation(explanation); examineRecordDelimiter.setFound("No record delimiter was found."); examineRecordDelimiter.setStatus(Status.error); examineRecordDelimiter.setSuggestion("Check that the record delimiter is specified in the metadata."); entity.addQualityCheck(examineRecordDelimiter); } } } if (QualityCheck.shouldRunQualityCheck(entity, dataLoadQualityCheck)) { if (rowCount > 0) { dataLoadQualityCheck.setStatus(Status.valid); dataLoadQualityCheck.setFound("The data table loaded successfully into a database"); } else { dataLoadQualityCheck.setFailedStatus(); dataLoadQualityCheck.setFound("No data could be loaded into a database"); } entity.addQualityCheck(dataLoadQualityCheck); /* * Store number of records found in a QualityCheck object */ String numberOfRecordsIdentifier = "numberOfRecords"; QualityCheck numberOfRecordsTemplate = QualityReport.getQualityCheckTemplate(numberOfRecordsIdentifier); QualityCheck numberOfRecordsQualityCheck = new QualityCheck(numberOfRecordsIdentifier, numberOfRecordsTemplate); if (QualityCheck.shouldRunQualityCheck(entity, numberOfRecordsQualityCheck)) { int expectedNumberOfRecords = entity.getNumRecords(); numberOfRecordsQualityCheck.setExpected("" + expectedNumberOfRecords); numberOfRecordsQualityCheck.setFound("" + rowCount); if (expectedNumberOfRecords == rowCount) { numberOfRecordsQualityCheck.setStatus(Status.valid); numberOfRecordsQualityCheck.setExplanation( "The expected number of records (" + rowCount + ") was found in the data table."); } // When zero records were counted, set an error status else if ((expectedNumberOfRecords != 0) && (rowCount == 0)) { numberOfRecordsQualityCheck.setFailedStatus(); numberOfRecordsQualityCheck.setExplanation( "The number of records found in the data table was: " + rowCount + ". Check that a valid record delimiter was specified in the metadata."); } // When 'numberOfRecords' is not specified in the EML, the EML // parser sets the value to -1. else if (expectedNumberOfRecords < 0) { numberOfRecordsQualityCheck.setStatus(Status.info); numberOfRecordsQualityCheck.setExplanation( "The number of records found in the data table was: " + rowCount + ". There was no 'numberOfRecords' value specified in the EML."); } else { // Report number of records check as failed numberOfRecordsQualityCheck.setFailedStatus(); numberOfRecordsQualityCheck.setExplanation( "The number of records found in the data table (" + rowCount + ") does not match the 'numberOfRecords' value specified in the EML (" + expectedNumberOfRecords + ")" ); } entity.addQualityCheck(numberOfRecordsQualityCheck); } } success = true; } catch (Exception e) { log.error(e.getMessage()); e.printStackTrace(); success = false; exception = e; if (QualityCheck.shouldRunQualityCheck(entity, dataLoadQualityCheck)) { // Report data load status as failed dataLoadQualityCheck.setFailedStatus(); dataLoadQualityCheck.setFound("Error inserting data at row " + (rowCount + 1) + "."); String explanation = ""; dataLoadQualityCheck.setExplanation(explanation); entity.addQualityCheck(dataLoadQualityCheck); } try { connection.rollback(); } catch (Exception ee) { log.error(ee.getMessage()); } } finally { try { connection.setAutoCommit(true); } catch (Exception ee) { log.error(ee.getMessage()); } DataManager.returnConnection(connection); } } else { log.error("Input stream is null."); success = false; } completed = true; } /** * Determines whether the data table corresponding to a given identifier * already exists in the database and is loaded with data. This method is * mandated by the DataStorageInterface. * * First, check that the data table exists. Second, check that the row count * is greater than zero. (We want to make sure that the table has not only * been created, but has also been loaded with data.) * * @param identifier the identifier for the data table * @return true if the data table has been loaded into the database, else * false */ public boolean doesDataExist(String identifier) { boolean doesExist = false; try { String tableName = tableMonitor.identifierToTableName(identifier); doesExist = tableMonitor.isTableInDB(tableName); if (doesExist) { int rowCount = tableMonitor.countRows(tableName); doesExist = (rowCount > 0); } } catch (SQLException e) { log.error(e.getMessage()); e.printStackTrace(); } return doesExist; } /** * Gets the completion status of the serialize process. * * @param identifier Identifier of the entity which is being serialized * @return true if complete, false if not complete */ public boolean isCompleted(String identifier) { return completed || doesDataExist(identifier); } /** * Gets the success status of the serialize process - success or failure * * @param identifier Identifier of the entity which has been serialized * @return true if success, else false */ public boolean isSuccess(String identifier) { return success ||doesDataExist(identifier); } /** * Gets the Exception happend in serialization * @return Exception happend in serialization */ public Exception getException() { return exception; } }