Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cadc-tap-schema/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ sourceCompatibility = 11

group = 'org.opencadc'

version = '1.2.10'
version = '1.2.11'

description = 'OpenCADC TAP-1.1 tap schema server library'
def git_url = 'https://github.com/opencadc/tap'
Expand All @@ -29,7 +29,9 @@ dependencies {
implementation 'org.opencadc:cadc-cdp:[1.2.3,2.0)'
implementation 'org.opencadc:cadc-gms:[1.0,2.0)'
implementation 'org.opencadc:cadc-rest:[1.3.1,2.0)'
implementation 'org.apache.parquet:parquet-common:[1.13.0,)'
api 'org.opencadc:cadc-dali:[1.1,2.0)'
api 'org.opencadc:cadc-dali-parquet:[0.5.2,)'
api 'org.opencadc:cadc-tap:[1.1.21,2.0)'

implementation 'uk.ac.starlink:jcdf:[1.2.3,2.0)'
Expand Down
36 changes: 23 additions & 13 deletions cadc-tap-schema/src/main/java/ca/nrc/cadc/tap/db/TableLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public class TableLoader {

private final DatabaseDataType ddType;
private final DataSource dataSource;
private final int batchSize;
public Integer batchSize;
private long totalInserts = 0;

/**
Expand All @@ -115,7 +115,7 @@ public class TableLoader {
* @param dataSource destination database connection pool
* @param batchSize number of rows per commit transaction
*/
public TableLoader(DataSource dataSource, int batchSize) {
public TableLoader(DataSource dataSource, Integer batchSize) {
this.dataSource = dataSource;
this.batchSize = batchSize;
PluginFactory pf = new PluginFactory();
Expand All @@ -131,9 +131,15 @@ public TableLoader(DataSource dataSource, int batchSize) {
*/
public void load(TableDesc destTable, TableDataInputStream data) {
TableDesc reorgTable = data.acceptTargetTableDesc(destTable);


boolean manageTxn = true;
if (batchSize == null || batchSize <= 0) {
manageTxn = false;
batchSize = Integer.MAX_VALUE; // no batching, just one transaction
}

Profiler prof = new Profiler(TableLoader.class);
DatabaseTransactionManager tm = new DatabaseTransactionManager(dataSource);
DatabaseTransactionManager tm = manageTxn ? new DatabaseTransactionManager(dataSource) : null;
JdbcTemplate jdbc = new JdbcTemplate(dataSource);

// Loop over rows, start/commit txn every batchSize rows
Expand All @@ -142,13 +148,15 @@ public void load(TableDesc destTable, TableDataInputStream data) {
Iterator<List<Object>> dataIterator = data.iterator();
List<Object> nextRow = null;

List<List<Object>> batch = new ArrayList<>(batchSize);
List<List<Object>> batch = manageTxn ? new ArrayList<>(batchSize) : new ArrayList<>();
int count = 0;
try {
while (!done) {
count = 0;
tm.startTransaction();
prof.checkpoint("start-transaction");
if (manageTxn) {
tm.startTransaction();
prof.checkpoint("start-transaction");
}
BulkInsertStatement bulkInsertStatement = new BulkInsertStatement(reorgTable);

while (batch.size() < batchSize && dataIterator.hasNext()) {
Expand All @@ -160,9 +168,11 @@ public void load(TableDesc destTable, TableDataInputStream data) {
log.debug("Inserting " + batch.size() + " rows in this batch.");
jdbc.batchUpdate(sql, batch, batchSize, bulkInsertStatement);
prof.checkpoint("batch-of-inserts");

tm.commitTransaction();
prof.checkpoint("commit-transaction");

if (manageTxn) {
tm.commitTransaction();
prof.checkpoint("commit-transaction");
}
totalInserts += batch.size();
batch.clear();
done = !dataIterator.hasNext();
Expand All @@ -175,7 +185,7 @@ public void load(TableDesc destTable, TableDataInputStream data) {
log.error("unexpected exception trying to close input stream", oops);
}
try {
if (tm.isOpen()) {
if (manageTxn && tm.isOpen()) {
tm.rollbackTransaction();
prof.checkpoint("rollback-transaction");
}
Expand All @@ -192,7 +202,7 @@ public void load(TableDesc destTable, TableDataInputStream data) {
log.error("unexpected exception trying to close input stream", oops);
}
try {
if (tm.isOpen()) {
if (manageTxn && tm.isOpen()) {
tm.rollbackTransaction();
prof.checkpoint("rollback-transaction");
}
Expand All @@ -205,7 +215,7 @@ public void load(TableDesc destTable, TableDataInputStream data) {
+ " Current batch of " + batchSize + " failed with: " + t.getMessage(), t);

} finally {
if (tm.isOpen()) {
if (manageTxn && tm.isOpen()) {
log.error("BUG: Transaction manager unexpectedly open, rolling back.");
try {
tm.rollbackTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,27 @@
import ca.nrc.cadc.auth.AuthenticationUtil;
import ca.nrc.cadc.auth.HttpPrincipal;
import ca.nrc.cadc.auth.IdentityManager;
import ca.nrc.cadc.dali.tables.TableData;
import ca.nrc.cadc.dali.tables.votable.VOTableDocument;
import ca.nrc.cadc.db.DatabaseTransactionManager;
import ca.nrc.cadc.net.ResourceAlreadyExistsException;
import ca.nrc.cadc.profiler.Profiler;
import ca.nrc.cadc.rest.InlineContentHandler;
import ca.nrc.cadc.tap.db.TableCreator;
import ca.nrc.cadc.tap.db.TableLoader;
import ca.nrc.cadc.tap.schema.ColumnDesc;
import ca.nrc.cadc.tap.schema.SchemaDesc;
import ca.nrc.cadc.tap.schema.TableDesc;
import ca.nrc.cadc.tap.schema.TapPermissions;
import ca.nrc.cadc.tap.schema.TapSchemaDAO;

import java.util.Iterator;
import java.util.List;

import javax.security.auth.Subject;
import javax.sql.DataSource;
import org.apache.log4j.Logger;
import org.opencadc.tap.io.TableDataInputStream;
import org.springframework.jdbc.core.JdbcTemplate;

/**
Expand Down Expand Up @@ -250,7 +258,35 @@ private void createTable(TapSchemaDAO ts, String schemaName, String tableName) t
TableCreator tc = new TableCreator(ds);
tc.createTable(inputTable);
prof.checkpoint("create-table");


// load the table data if available
Object in = syncInput.getContent(INPUT_TAG);
if (in instanceof VOTableDocument) {
VOTableDocument voTableDocument = (VOTableDocument) in;
TableData tableData = voTableDocument.getResourceByType("results").getTable().getTableData();

if (tableData != null) {
TableLoader tableLoader = new TableLoader(ds, null);
tableLoader.load(inputTable, new TableDataInputStream() {
@Override
public void close() {
//no-op: fully read already
}

@Override
public Iterator<List<Object>> iterator() {
return tableData.iterator();
}

@Override
public TableDesc acceptTargetTableDesc(TableDesc td) {
return td;
}
});
tableData.close();
}
}

// add to tap_schema
// flag table as created using the API to allow table deletion in the DeleteAction
inputTable.apiCreated = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@

import ca.nrc.cadc.auth.AuthenticationUtil;
import ca.nrc.cadc.auth.HttpPrincipal;
import ca.nrc.cadc.dali.tables.votable.VOTableDocument;
import ca.nrc.cadc.dali.tables.votable.VOTableResource;
import ca.nrc.cadc.dali.tables.votable.VOTableTable;
import ca.nrc.cadc.log.WebServiceLogInfo;
import ca.nrc.cadc.net.ResourceNotFoundException;
import ca.nrc.cadc.net.TransientException;
Expand All @@ -80,6 +83,7 @@
import ca.nrc.cadc.tap.schema.TableDesc;
import ca.nrc.cadc.tap.schema.TapPermissions;
import ca.nrc.cadc.tap.schema.TapSchemaDAO;
import ca.nrc.cadc.tap.schema.TapSchemaUtil;
import java.io.IOException;
import java.security.AccessControlException;
import java.security.Principal;
Expand Down Expand Up @@ -183,25 +187,49 @@ protected final TapSchemaDAO getTapSchemaDAO() {
dao.setOrdered(true);
return dao;
}


private TableDesc toTableDesc(VOTableDocument doc) {
for (VOTableResource vr : doc.getResources()) {
VOTableTable vtab = vr.getTable();
if (vtab != null) {
TableDesc ret = TapSchemaUtil.createTableDesc("default", "default", vtab);
log.debug("create from VOtable: " + ret);
// strip out some incoming table metadata
// - ID attr (should be transient usage only)
for (ColumnDesc cd : ret.getColumnDescs()) {
cd.columnID = null;
}
return ret;
}
}
throw new IllegalArgumentException("no table description found in VOTable document");
}

protected TableDesc getInputTable(String schemaName, String tableName) {
Object in = syncInput.getContent(INPUT_TAG);
if (in == null) {
throw new IllegalArgumentException("no input: expected a document describing the table to create/update");
}

TableDesc input;

if (in instanceof TableDesc) {
TableDesc input = (TableDesc) in;
input.setSchemaName(schemaName);
input.setTableName(tableName);
// TODO: move this to PutAction (create only)
int c = 0;
for (ColumnDesc cd : input.getColumnDescs()) {
cd.setTableName(tableName);
cd.columnIndex = c++;
}
return input;
input = (TableDesc) in;
} else if (in instanceof VOTableDocument) {
input = toTableDesc((VOTableDocument) in);
} else {
throw new RuntimeException("BUG: no input table");
}
throw new RuntimeException("BUG: no input table");

input.setSchemaName(schemaName);
input.setTableName(tableName);
// TODO: move this to PutAction (create only)
int c = 0;
for (ColumnDesc cd : input.getColumnDescs()) {
cd.setTableName(tableName);
cd.columnIndex = c++;
}
return input;
}

protected SchemaDesc getInputSchema(String schemaName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,14 @@

package ca.nrc.cadc.vosi.actions;

import ca.nrc.cadc.dali.tables.parquet.ParquetReader;
import ca.nrc.cadc.dali.tables.votable.VOTableDocument;
import ca.nrc.cadc.dali.tables.votable.VOTableReader;
import ca.nrc.cadc.dali.tables.votable.VOTableResource;
import ca.nrc.cadc.dali.tables.votable.VOTableTable;
import ca.nrc.cadc.io.ByteCountInputStream;
import ca.nrc.cadc.rest.InlineContentException;
import ca.nrc.cadc.rest.InlineContentHandler;
import ca.nrc.cadc.tap.schema.ColumnDesc;
import ca.nrc.cadc.tap.schema.SchemaDesc;
import ca.nrc.cadc.tap.schema.TableDesc;
import ca.nrc.cadc.tap.schema.TapSchema;
import ca.nrc.cadc.tap.schema.TapSchemaUtil;
import ca.nrc.cadc.util.StringUtil;
import ca.nrc.cadc.vosi.InvalidTableSetException;
import ca.nrc.cadc.vosi.TableReader;
Expand All @@ -99,6 +95,7 @@ public class TablesInputHandler implements InlineContentHandler {
public static final String VOSI_TABLE_TYPE = "text/xml";
public static final String VOTABLE_TYPE = "application/x-votable+xml";
public static final String VOSI_SCHEMA_TYPE = "application/x-vosi-schema";
public static final String PARQUET_TYPE = "application/vnd.apache.parquet";
// VOSI tableset schema cannot carry owner information
//public static final String VOSI_SCHEMA_TYPE = "text/plain"; // key = value

Expand All @@ -114,7 +111,7 @@ public Content accept(String name, String contentType, InputStream in) throws In
try {
String schemaOwner = null;
SchemaDesc sch = null;
TableDesc tab = null;
Object tab = null;
if (VOSI_SCHEMA_TYPE.equalsIgnoreCase(contentType)) {
ByteCountInputStream istream = new ByteCountInputStream(in, BYTE_LIMIT);
String str = StringUtil.readFromInputStream(istream, "UTF-8");
Expand All @@ -136,8 +133,11 @@ public Content accept(String name, String contentType, InputStream in) throws In
} else if (VOTABLE_TYPE.equalsIgnoreCase(contentType)) {
VOTableReader tr = new VOTableReader();
ByteCountInputStream istream = new ByteCountInputStream(in, BYTE_LIMIT);
VOTableDocument doc = tr.read(istream);
tab = toTableDesc(doc);
tab = tr.read(istream);
} else if (PARQUET_TYPE.equalsIgnoreCase(contentType)) {
ByteCountInputStream istream = new ByteCountInputStream(in, BYTE_LIMIT);
ParquetReader parquetReader = new ParquetReader();
tab = parquetReader.read(istream);
}
InlineContentHandler.Content ret = new InlineContentHandler.Content();
ret.name = objectTag;
Expand All @@ -151,21 +151,4 @@ public Content accept(String name, String contentType, InputStream in) throws In
}
}

private TableDesc toTableDesc(VOTableDocument doc) {
// TODO: reject if the table has any rows? try to insert them if it is small enough?
for (VOTableResource vr : doc.getResources()) {
VOTableTable vtab = vr.getTable();
if (vtab != null) {
TableDesc ret = TapSchemaUtil.createTableDesc("default", "default", vtab);
log.debug("create from VOtable: " + ret);
// strip out some incoming table metadata
// - ID attr (should be transient usage only)
for (ColumnDesc cd : ret.getColumnDescs()) {
cd.columnID = null;
}
return ret;
}
}
throw new IllegalArgumentException("no table description found in VOTable document");
}
}
2 changes: 1 addition & 1 deletion cadc-tap-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ dependencies {
implementation 'org.opencadc:cadc-registry:[1.4,)'
implementation 'org.opencadc:cadc-rest:[1.4,)'
api 'org.opencadc:cadc-dali:[1.2.23,)'
api 'org.opencadc:cadc-tap-schema:[1.2.6,)'
api 'org.opencadc:cadc-tap-schema:[1.2.11,)'
api 'org.opencadc:cadc-uws:[1.0,)'
api 'org.opencadc:cadc-uws-server:[1.2.23,)'

Expand Down
5 changes: 3 additions & 2 deletions youcat/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ dependencies {
implementation 'org.opencadc:cadc-util:[1.12.0,2.0)'
implementation 'org.opencadc:cadc-rest:[1.4.5,)'
implementation 'org.opencadc:cadc-dali:[1.2.23,)'
implementation 'org.opencadc:cadc-dali-parquet:[0.5.0,)'
implementation 'org.opencadc:cadc-dali-parquet:[0.5.2,)'
implementation 'org.opencadc:cadc-datalink:[1.1.5,)'
implementation 'org.opencadc:cadc-adql:[1.1.14,)'
implementation 'org.opencadc:cadc-uws:[1.0.2,)'
Expand All @@ -36,7 +36,7 @@ dependencies {
implementation 'org.opencadc:cadc-tap-server-pg:[1.1.1,)'
implementation 'org.opencadc:cadc-adql:[1.1.4,)'
implementation 'org.opencadc:cadc-vosi:[1.4.3,2.0)'
implementation 'org.opencadc:cadc-registry:[1.7.2,)'
implementation 'org.opencadc:cadc-registry:[1.8.0,)'
implementation 'org.opencadc:cadc-gms:[1.0.4,2.0)'
implementation 'org.opencadc:cadc-tap-tmp:[1.1.5,)'
implementation 'org.opencadc:cadc-cdp:[1.4.0,)'
Expand All @@ -51,6 +51,7 @@ dependencies {
intTestImplementation 'org.opencadc:cadc-test-vosi:[1.0.11,)'
intTestImplementation 'org.opencadc:cadc-test-tap:[1.1,)'
intTestImplementation 'uk.ac.starlink:stil:[4.0,5.0)'
intTestImplementation 'org.apache.parquet:parquet-common:[1.13.0,)'

intTestRuntimeOnly 'org.postgresql:postgresql:[42.2,43.0)'
}
Expand Down
Loading
Loading