Skip to content

Commit

Permalink
Fetching results through own Java code instead of RJDBC for greater e…
Browse files Browse the repository at this point in the history
…fficiency and prevention of memory leaks
  • Loading branch information
schuemie committed Jul 4, 2017
1 parent 503c2a8 commit 91ecd42
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 120 deletions.
3 changes: 3 additions & 0 deletions .Rbuildignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
extras
man-roxygen
bin
^.*\.Rproj$
^\.Rproj\.user$
^\.travis\.yml$
R/LocalEnvironment.R
deploy.sh
.classpath
.project
6 changes: 6 additions & 0 deletions .classpath
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="java"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/jdk6"/>
<classpathentry kind="output" path="bin"/>
</classpath>
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
.Rhistory
.RData
R/LocalEnvironment.R
/bin/
6 changes: 6 additions & 0 deletions .project
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
4 changes: 4 additions & 0 deletions R/DatabaseConnector.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
#' @importFrom utils sessionInfo setTxtProgressBar txtProgressBar object.size
NULL

.onLoad <- function(libname, pkgname) {
rJava::.jpackage(pkgname, lib.loc = libname)
}


#' @title
#' createConnectionDetails
Expand Down
167 changes: 55 additions & 112 deletions R/Sql.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

.getBatch <- function(resultSet, batchSize, datesAsString) {
batch <- RJDBC::fetch(resultSet, batchSize)
if (!datesAsString) {
cols <- rJava::.jcall(resultSet@md, "I", "getColumnCount")
for (i in 1:cols) {
type <- rJava::.jcall(resultSet@md, "I", "getColumnType", i)
if (type == 91)
batch[, i] <- as.Date(batch[, i])
}
}
return(batch)
}

.systemInfo <- function() {
si <- sessionInfo()
lines <- c()
Expand Down Expand Up @@ -73,10 +60,6 @@
#'
#' @param connection The connection to the database server.
#' @param query The SQL statement to retrieve the data
#' @param batchSize The number of rows that will be retrieved at a time from the server. A larger
#' batchSize means less calls to the server so better performance, but too large
#' a batchSize could lead to out-of-memory errors. The default is "auto", meaning
#' heuristics will determine the appropriate batch size.
#' @param datesAsString Should dates be imported as character vectors, our should they be converted
#' to R's date format?
#'
Expand All @@ -91,73 +74,38 @@
#' @export
lowLevelQuerySql.ffdf <- function(connection,
query = "",
batchSize = "auto",
datesAsString = FALSE) {
# Create resultset:
rJava::.jcall("java/lang/System", , "gc")

# Have to set autocommit to FALSE for PostgreSQL, or else it will ignore setFetchSize (Note: reason
# for this is that PostgreSQL doesn't want the data set you're getting to change during fetch)
autoCommit <- rJava::.jcall(connection@jc, "Z", "getAutoCommit")
if (autoCommit) {
rJava::.jcall(connection@jc, "V", "setAutoCommit", FALSE)
on.exit(rJava::.jcall(connection@jc, "V", "setAutoCommit", TRUE))
}

type_forward_only <- rJava::.jfield("java/sql/ResultSet", "I", "TYPE_FORWARD_ONLY")
concur_read_only <- rJava::.jfield("java/sql/ResultSet", "I", "CONCUR_READ_ONLY")
s <- rJava::.jcall(connection@jc,
"Ljava/sql/Statement;",
"createStatement",
type_forward_only,
concur_read_only)
if (rJava::is.jnull(connection@jc))
stop("Connection is closed")
batchedQuery <- rJava::.jnew("org.ohdsi.databaseConnector.BatchedQuery", connection@jc, query)

# Have to call setFetchSize on Statement object for PostgreSQL (RJDBC only calls it on ResultSet)
rJava::.jcall(s, "V", method = "setFetchSize", as.integer(2048))
on.exit(rJava::.jcall(batchedQuery, "V", "clear"))

r <- rJava::.jcall(s, "Ljava/sql/ResultSet;", "executeQuery", as.character(query)[1])
md <- rJava::.jcall(r, "Ljava/sql/ResultSetMetaData;", "getMetaData", check = FALSE)
resultSet <- new("JDBCResult", jr = r, md = md, stat = s, pull = rJava::.jnull())

on.exit(RJDBC::dbClearResult(resultSet), add = TRUE)

# Fetch first 100 rows to estimate required memory per batch:
batch <- .getBatch(resultSet, 100, datesAsString)
if (batchSize == "auto") {
batchSize <- floor(5e8 / as.numeric(object.size(batch)))
}
n <- nrow(batch)

# Convert to ffdf object:
charCols <- sapply(batch, class)
charCols <- names(charCols[charCols == "character"])
for (charCol in charCols) {
batch[[charCol]] <- factor(batch[[charCol]])
}
if (n == 0) {
data <- batch #ffdf cannot contain 0 rows, so return data.frame instead
warning("Data has zero rows, returning an empty data frame")
} else {
data <- ff::as.ffdf(batch)
columnTypes <- rJava::.jcall(batchedQuery, "[I", "getColumnTypes")
columns <- vector("list", length(columnTypes))
while (!rJava::.jcall(batchedQuery, "Z", "isDone")) {
rJava::.jcall(batchedQuery, "V", "fetchBatch")
for (i in seq.int(length(columnTypes))) {
if (columnTypes[i] == 1) {
columns[[i]] <- ffbase::ffappend(columns[[i]], rJava::.jcall(batchedQuery, "[D", "getNumeric", as.integer(i)))
} else {
columns[[i]] <- ffbase::ffappend(columns[[i]], factor(rJava::.jcall(batchedQuery, "[Ljava/lang/String;", "getString", i)))
}
}
}

if (n == 100) {
# Fetch remaining data in batches:
n <- batchSize
while (n == batchSize) {
batch <- .getBatch(resultSet, batchSize, datesAsString)

n <- nrow(batch)
if (n != 0) {
for (charCol in charCols) batch[[charCol]] <- factor(batch[[charCol]])
data <- ffbase::ffdfappend(data, batch)
if (!datesAsString) {
for (i in seq.int(length(columnTypes))) {
if (columnTypes[i] == 3) {
columns[[i]] <- ffbase::as.Date.ff_vector(columns[[i]])
}
}
}
return(data)
}
ffdf <- do.call(ff::ffdf, columns)
names(ffdf) <- rJava::.jcall(batchedQuery, "[Ljava/lang/String;", "getColumnNames")
return(ffdf)
}

#' Low level function for retrieving data to an ffdf object
#' Low level function for retrieving data to a data frame
#'
#' @description
#' This is the equivalent of the \code{\link{querySql}} function, except no error report is written
Expand All @@ -176,45 +124,35 @@ lowLevelQuerySql.ffdf <- function(connection,
#'
#' @export
lowLevelQuerySql <- function(connection, query = "", datesAsString = FALSE) {
# Create resultset:
rJava::.jcall("java/lang/System", , "gc")

# Have to set autocommit to FALSE for PostgreSQL, or else it will ignore setFetchSize (Note: reason
# for this is that PostgreSQL doesn't want the data set you're getting to change during fetch)
autoCommit <- rJava::.jcall(connection@jc, "Z", "getAutoCommit")
if (autoCommit) {
rJava::.jcall(connection@jc, "V", "setAutoCommit", FALSE)
on.exit(rJava::.jcall(connection@jc, "V", "setAutoCommit", TRUE))
}

type_forward_only <- rJava::.jfield("java/sql/ResultSet", "I", "TYPE_FORWARD_ONLY")
concur_read_only <- rJava::.jfield("java/sql/ResultSet", "I", "CONCUR_READ_ONLY")
s <- rJava::.jcall(connection@jc,
"Ljava/sql/Statement;",
"createStatement",
type_forward_only,
concur_read_only)

# Have to call setFetchSize on Statement object for PostgreSQL (RJDBC only calls it on ResultSet)
rJava::.jcall(s, "V", method = "setFetchSize", as.integer(2048))
if (rJava::is.jnull(connection@jc))
stop("Connection is closed")
batchedQuery <- rJava::.jnew("org.ohdsi.databaseConnector.BatchedQuery", connection@jc, query)

r <- rJava::.jcall(s, "Ljava/sql/ResultSet;", "executeQuery", as.character(query)[1])
md <- rJava::.jcall(r, "Ljava/sql/ResultSetMetaData;", "getMetaData", check = FALSE)
resultSet <- new("JDBCResult", jr = r, md = md, stat = s, pull = rJava::.jnull())
on.exit(rJava::.jcall(batchedQuery, "V", "clear"))

on.exit(RJDBC::dbClearResult(resultSet), add = TRUE)

data <- RJDBC::fetch(resultSet, -1)

if (!datesAsString) {
cols <- rJava::.jcall(resultSet@md, "I", "getColumnCount")
for (i in 1:cols) {
type <- rJava::.jcall(resultSet@md, "I", "getColumnType", i)
if (type == 91)
data[, i] <- as.Date(data[, i])
columnTypes <- rJava::.jcall(batchedQuery, "[I", "getColumnTypes")
columns <- vector("list", length(columnTypes))
while (!rJava::.jcall(batchedQuery, "Z", "isDone")) {
rJava::.jcall(batchedQuery, "V", "fetchBatch")
for (i in seq.int(length(columnTypes))) {
if (columnTypes[i] == 1) {
columns[[i]] <- c(columns[[i]], rJava::.jcall(batchedQuery, "[D", "getNumeric", as.integer(i)))
} else {
columns[[i]] <- c(columns[[i]], rJava::.jcall(batchedQuery, "[Ljava/lang/String;", "getString", i))
}
}
}
return(data)
if (!datesAsString) {
for (i in seq.int(length(columnTypes))) {
if (columnTypes[i] == 3) {
columns[[i]] <- as.Date(columns[[i]])
}
}
}
names(columns) <- rJava::.jcall(batchedQuery, "[Ljava/lang/String;", "getColumnNames")
attr(columns, "row.names") <- c(NA_integer_, length(columns[[1]]))
class(columns) <- "data.frame"
return(columns)
}

#' Execute SQL code
Expand Down Expand Up @@ -255,6 +193,8 @@ executeSql <- function(connection,
profile = FALSE,
progressBar = TRUE,
reportOverallTime = TRUE) {
if (rJava::is.jnull(connection@jc))
stop("Connection is closed")
if (profile)
progressBar <- FALSE
sqlStatements <- SqlRender::splitSql(sql)
Expand Down Expand Up @@ -320,12 +260,13 @@ executeSql <- function(connection,
#' }
#' @export
querySql <- function(connection, sql) {
if (rJava::is.jnull(connection@jc))
stop("Connection is closed")
# Calling splitSql, because this will also strip trailing semicolons (which cause Oracle to crash).
sqlStatements <- SqlRender::splitSql(sql)
if (length(sqlStatements) > 1)
stop(paste("A query that returns a result can only consist of one SQL statement, but", length(sqlStatements), "statements were found"))
tryCatch({
rJava::.jcall("java/lang/System", , "gc") #Calling garbage collection prevents crashes
result <- lowLevelQuerySql(connection, sqlStatements[1])
colnames(result) <- toupper(colnames(result))
if(attr(connection, "dbms") == "impala") {
Expand Down Expand Up @@ -372,6 +313,8 @@ querySql <- function(connection, sql) {
#' }
#' @export
querySql.ffdf <- function(connection, sql) {
if (rJava::is.jnull(connection@jc))
stop("Connection is closed")
tryCatch({
result <- lowLevelQuerySql.ffdf(connection, sql)
colnames(result) <- toupper(colnames(result))
Expand Down
Binary file added inst/java/DatabaseConnector.jar
Binary file not shown.
33 changes: 33 additions & 0 deletions java/DatabaseConnector.jardesc
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?xml version="1.0" encoding="WINDOWS-1252" standalone="no"?>
<jardesc>
<jar path="DatabaseConnector/inst/java/DatabaseConnector.jar"/>
<options buildIfNeeded="true" compress="true" descriptionLocation="/DatabaseConnector/java/DatabaseConnector.jardesc" exportErrors="true" exportWarnings="true" includeDirectoryEntries="false" overwrite="true" saveDescription="true" storeRefactorings="false" useSourceFolders="false"/>
<storedRefactorings deprecationInfo="true" structuralOnly="false"/>
<selectedProjects/>
<manifest generateManifest="true" manifestLocation="/WhiteRabbit/jarDesc/WhiteRabbit.MF" manifestVersion="1.0" reuseManifest="false" saveManifest="false" usesManifest="true">
<sealing sealJar="false">
<packagesToSeal/>
<packagesToUnSeal/>
</sealing>
</manifest>
<selectedElements exportClassFiles="true" exportJavaFiles="false" exportOutputFolder="false">
<file path="/DatabaseConnector/NAMESPACE"/>
<file path="/DatabaseConnector/.Rbuildignore"/>
<file path="/DatabaseConnector/DatabaseConnector.Rproj"/>
<file path="/DatabaseConnector/DESCRIPTION"/>
<folder path="/DatabaseConnector/tests"/>
<file path="/DatabaseConnector/deploy.sh"/>
<file path="/DatabaseConnector/.travis.yml"/>
<file path="/DatabaseConnector/.gitignore"/>
<folder path="/DatabaseConnector/inst"/>
<folder path="/DatabaseConnector/man-roxygen"/>
<file path="/DatabaseConnector/README.md"/>
<file path="/DatabaseConnector/.project"/>
<javaElement handleIdentifier="=DatabaseConnector/java"/>
<folder path="/DatabaseConnector/man"/>
<folder path="/DatabaseConnector/extras"/>
<file path="/DatabaseConnector/.classpath"/>
<folder path="/DatabaseConnector/R"/>
<file path="/DatabaseConnector/.gitattributes"/>
</selectedElements>
</jardesc>
Loading

0 comments on commit 91ecd42

Please sign in to comment.