Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: pysparklyr
Title: Provides a 'PySpark' Back-End for the 'sparklyr' Package
Version: 0.2.0.9002
Version: 0.2.0.9003
Authors@R: c(
person("Edgar", "Ruiz", , "[email protected]", role = c("aut", "cre")),
person("Posit Software, PBC", role = c("cph", "fnd"),
Expand Down
7 changes: 6 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
### Improvements

- Adds support for `tune_grid_spark()`. It enables running a Tidymodels tune
grid inside Spark Connect clusters.
grid inside Spark Connect clusters.

- Databricks Connect now auto-detects the latest library version from PyPI when
no `version` parameter is specified. When the auto-detected version differs from
the cluster's DBR version, a warning is displayed with suggestions for ensuring
version compatibility.

# pysparklyr 0.2.0

Expand Down
31 changes: 30 additions & 1 deletion R/connect-databricks.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ spark_connect_method.spark_method_databricks_connect <- function(
version <- databricks_extract_version(cluster_info)
}

# Track if version was provided by user
version_provided <- !is.null(version)

# load python env
envname <- use_envname(
backend = "databricks",
Expand All @@ -63,7 +66,12 @@ spark_connect_method.spark_method_databricks_connect <- function(
db_sdk <- import_check("databricks.sdk", envname, silent = TRUE)

if (is.null(token) || token == "") {
sdk_config <- db_sdk$core$Config()
if (!is.null(profile)) {
sdk_config <- db_sdk$core$Config(profile = profile)
} else {
sdk_config <- db_sdk$core$Config()
}

token <- sdk_config$token %||% ""
}

Expand All @@ -88,6 +96,27 @@ spark_connect_method.spark_method_databricks_connect <- function(
}
}

# Check for version mismatch and warn user
if (!is.null(cluster_info) && !version_provided) {
cluster_version <- databricks_extract_version(cluster_info)
if (
!is.null(version) && cluster_version != "" && cluster_version != version
) {
if (!silent) {
cli_div(theme = cli_colors())
cli_alert_warning(
paste0(
"Using databricks.connect version {.emph {version}}, which differs from ",
"Databricks' DBR version {.emph {cluster_version}}. If you experience instability, ",
"consider using {.code version = \"{cluster_version}\"} to ensure a matching ",
"version is used during the R session."
)
)
cli_end()
}
}
}

if (!is.null(cluster_info)) {
msg <- "{.header Connecting to} {.emph '{cluster_info$cluster_name}'}"
msg_done <- "{.header Connected to:} {.emph '{cluster_info$cluster_name}'}"
Expand Down
20 changes: 16 additions & 4 deletions R/python-use-envname.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,18 @@ use_envname <- function(
return(set_names(envname, "argument"))
}

version_from_pypi <- FALSE
if (is.null(version)) {
cli_abort("A cluster {.code version} is required, please provide one")
if (!is.null(main_library)) {
lib_info <- python_library_info(main_library, fail = FALSE, verbose = FALSE)
if (!is.null(lib_info)) {
version <- lib_info$version
version_from_pypi <- TRUE
}
}
if (is.null(version)) {
cli_abort("A cluster {.code version} is required, please provide one")
}
}

env_base <- glue("r-sparklyr-{backend}-")
Expand Down Expand Up @@ -72,7 +82,8 @@ use_envname <- function(

# There were 0 environments found
if (!match_one && !match_exact) {
ret <- set_names(envname, "unavailable")
ret_name <- if (version_from_pypi) "latest" else "unavailable"
ret <- set_names(envname, ret_name)
msg_1 <- msg_default
msg_no <- " - Will use the default Python environment"
}
Expand Down Expand Up @@ -103,7 +114,8 @@ use_envname <- function(
if (match_one && !match_exact && !match_first) {
msg_1 <- msg_default
msg_no <- " - Will use the default Python environment"
ret <- set_names(envname, "unavailable")
ret_name <- if (version_from_pypi) "latest" else "unavailable"
ret <- set_names(envname, ret_name)
}

ret_name <- names(ret)
Expand Down Expand Up @@ -136,7 +148,7 @@ use_envname <- function(
stop_quietly()
}
} else {
if (ret_name == "unavailable") {
if (ret_name %in% c("unavailable", "latest")) {
reqs <- python_requirements(
backend = backend,
main_library = main_library,
Expand Down
150 changes: 148 additions & 2 deletions tests/testthat/test-python-use-envname.R
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,154 @@ test_that("'Ask to install', simulates menu selection 'Cancel'", {
)
})

test_that("Expect error when no 'version' is provided", {
expect_error(use_envname(version = NULL))
test_that("Expect error when no 'version' is provided and no 'main_library'", {
expect_error(use_envname(version = NULL, main_library = NULL))
})

test_that("Auto-detect version from PyPI when version is NULL", {
env_path <- use_new_temp_env()
withr::with_envvar(
new = c("WORKON_HOME" = env_path),
{
local_mocked_bindings(
python_library_info = function(
library_name,
library_version = NULL,
verbose = TRUE,
fail = TRUE,
timeout = 2
) {
list(
version = "3.5.0",
requires_python = ">=3.8",
name = library_name
)
}
)
x <- use_envname(
main_library = "pyspark",
version = NULL,
messages = FALSE,
match_first = FALSE,
ask_if_not_installed = FALSE
)
expect_named(x, "latest")
expect_equal(as.character(x), "r-sparklyr-pyspark-3.5")
}
)
})

test_that("Auto-detect uses latest version for databricks.connect", {
env_path <- use_new_temp_env()
withr::with_envvar(
new = c("WORKON_HOME" = env_path),
{
local_mocked_bindings(
python_library_info = function(
library_name,
library_version = NULL,
verbose = TRUE,
fail = TRUE,
timeout = 2
) {
list(
version = "14.2.0",
requires_python = ">=3.8",
name = library_name
)
}
)
x <- use_envname(
main_library = "databricks.connect",
version = NULL,
backend = "databricks",
messages = FALSE,
match_first = FALSE,
ask_if_not_installed = FALSE
)
expect_named(x, "latest")
expect_equal(as.character(x), "r-sparklyr-databricks-14.2")
}
)
})

test_that("Falls back to error when PyPI query fails and no version provided", {
withr::with_envvar(
new = c("WORKON_HOME" = use_new_temp_env()),
{
local_mocked_bindings(
python_library_info = function(
library_name,
library_version = NULL,
verbose = TRUE,
fail = TRUE,
timeout = 2
) {
return(NULL)
}
)
expect_error(
use_envname(
main_library = "pyspark",
version = NULL,
messages = FALSE
),
"A cluster.*version.*is required"
)
}
)
})

test_that("Explicit version returns 'unavailable' not 'latest'", {
env_path <- use_new_temp_env()
withr::with_envvar(
new = c("WORKON_HOME" = env_path),
{
x <- use_envname(
main_library = "pyspark",
version = "3.5.0",
messages = FALSE,
match_first = FALSE,
ask_if_not_installed = FALSE
)
expect_named(x, "unavailable")
expect_equal(as.character(x), "r-sparklyr-pyspark-3.5")
}
)
})

test_that("Auto-detect finds exact match when environment exists", {
env_path <- use_new_temp_env()
dir_create(path(env_path, "r-sparklyr-pyspark-3.5"))
withr::with_envvar(
new = c("WORKON_HOME" = env_path),
{
local_mocked_bindings(
python_library_info = function(
library_name,
library_version = NULL,
verbose = TRUE,
fail = TRUE,
timeout = 2
) {
list(
version = "3.5.0",
requires_python = ">=3.8",
name = library_name
)
}
)
x <- use_envname(
main_library = "pyspark",
version = NULL,
messages = FALSE,
match_first = FALSE,
ask_if_not_installed = FALSE
)
expect_named(x, "exact")
expect_equal(as.character(x), "r-sparklyr-pyspark-3.5")
}
)
})


Expand Down
Loading