Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions sqlmesh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,20 @@ def configure_logging(
logger.addHandler(file_handler)

if debug:
import faulthandler
import faulthandler, io

enable_debug_mode()

# Enable threadumps.
faulthandler.enable()
try:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the tests for --debug was failing because faulthandler.enable() raises an Exception if sys.stderr.fileno() fails.

sys.stderr.fileno() can fail if something, for example the Click TestRunner (or even pytest) has messed with sys.stderr and it's no longer a file.

So rather than bomb out, I changed this to just log a warning

faulthandler.enable()

# Windows doesn't support register so we check for it here
if hasattr(faulthandler, "register"):
from signal import SIGUSR1
# Windows doesn't support register so we check for it here
if hasattr(faulthandler, "register"):
from signal import SIGUSR1

faulthandler.register(SIGUSR1.value)
faulthandler.register(SIGUSR1.value)
except io.UnsupportedOperation as e:
# this can fail if sys.stderr isnt a proper file handle, which can happen in unit test / notebook environments
# rather than prevent SQLMesh from working because we couldnt register the faulthandler, we just log a warning and carry on
logger.warning("Unable to enable faulthandler for thread dumps", exc_info=e)
184 changes: 53 additions & 131 deletions sqlmesh_dbt/cli.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,46 @@
import typing as t
import sys
import click
from click.core import ParameterSource
from sqlmesh_dbt.operations import DbtOperations, create
from sqlmesh_dbt.error import cli_global_error_handler, ErrorHandlingGroup
from pathlib import Path
from sqlmesh_dbt.options import YamlParamType
import functools
from sqlmesh_dbt.options import global_options, run_options, list_options


def _get_dbt_operations(
ctx: click.Context, vars: t.Optional[t.Dict[str, t.Any]], threads: t.Optional[int] = None
) -> DbtOperations:
if not isinstance(ctx.obj, functools.partial):
ctx: click.Context, **kwargs: t.Dict[str, t.Any]
) -> t.Tuple[DbtOperations, t.Dict[str, t.Any]]:
if not isinstance(ctx.obj, dict):
raise ValueError(f"Unexpected click context object: {type(ctx.obj)}")

dbt_operations = ctx.obj(vars=vars, threads=threads)
all_options = {
**kwargs,
# ctx.obj only contains global options that a user specifically provided, so these values take precedence
# over **kwargs which contains all options (plus Click's defaults) whether or not they were explicitly set by a user
**ctx.obj,
}

T = t.TypeVar("T")

def _pop_global_option(name: str, expected_type: t.Type[T]) -> t.Optional[T]:
value = all_options.pop(name, None)
if value is not None and not isinstance(value, expected_type):
raise ValueError(
f"Expecting option '{name}' to be type '{expected_type}', however it was '{type(value)}'"
)
return value

dbt_operations = create(
project_dir=_pop_global_option("project_dir", Path),
profiles_dir=_pop_global_option("profiles_dir", Path),
profile=_pop_global_option("profile", str),
target=_pop_global_option("target", str),
vars=_pop_global_option("vars", dict),
threads=_pop_global_option("threads", int),
debug=_pop_global_option("debug", bool) or False,
log_level=_pop_global_option("log_level", str),
)

if not isinstance(dbt_operations, DbtOperations):
raise ValueError(f"Unexpected dbt operations type: {type(dbt_operations)}")
Expand All @@ -23,88 +49,15 @@ def _get_dbt_operations(
def _cleanup() -> None:
dbt_operations.close()

return dbt_operations


vars_option = click.option(
"--vars",
type=YamlParamType(),
help="Supply variables to the project. This argument overrides variables defined in your dbt_project.yml file. This argument should be a YAML string, eg. '{my_variable: my_value}'",
)


select_option = click.option(
"-s",
"--select",
multiple=True,
help="Specify the nodes to include.",
)
model_option = click.option(
"-m",
"--models",
"--model",
multiple=True,
help="Specify the model nodes to include; other nodes are excluded.",
)
exclude_option = click.option("--exclude", multiple=True, help="Specify the nodes to exclude.")

# TODO: expand this out into --resource-type/--resource-types and --exclude-resource-type/--exclude-resource-types
resource_types = [
"metric",
"semantic_model",
"saved_query",
"source",
"analysis",
"model",
"test",
"unit_test",
"exposure",
"snapshot",
"seed",
"default",
"all",
]
resource_type_option = click.option(
"--resource-type", type=click.Choice(resource_types, case_sensitive=False)
)
# at this point, :all_options just contains what's left because we popped the global options
return dbt_operations, all_options


@click.group(cls=ErrorHandlingGroup, invoke_without_command=True)
@click.option("--profile", help="Which existing profile to load. Overrides output.profile")
@click.option("-t", "--target", help="Which target to load for the given profile")
@click.option(
"-d",
"--debug/--no-debug",
default=False,
help="Display debug logging during dbt execution. Useful for debugging and making bug reports events to help when debugging.",
)
@click.option(
"--log-level",
default="info",
type=click.Choice(["debug", "info", "warn", "error", "none"]),
help="Specify the minimum severity of events that are logged to the console and the log file.",
)
@click.option(
"--profiles-dir",
type=click.Path(exists=True, file_okay=False, path_type=Path),
help="Which directory to look in for the profiles.yml file. If not set, dbt will look in the current working directory first, then HOME/.dbt/",
)
@click.option(
"--project-dir",
type=click.Path(exists=True, file_okay=False, path_type=Path),
help="Which directory to look in for the dbt_project.yml file. Default is the current working directory and its parents.",
)
@global_options
@click.pass_context
@cli_global_error_handler
def dbt(
ctx: click.Context,
profile: t.Optional[str] = None,
target: t.Optional[str] = None,
debug: bool = False,
log_level: t.Optional[str] = None,
profiles_dir: t.Optional[Path] = None,
project_dir: t.Optional[Path] = None,
) -> None:
def dbt(ctx: click.Context, **kwargs: t.Any) -> None:
"""
An ELT tool for managing your SQL transformations and data models, powered by the SQLMesh engine.
"""
Expand All @@ -113,76 +66,45 @@ def dbt(
# we dont need to import sqlmesh/load the project for CLI help
return

# we have a partially applied function here because subcommands might set extra options like --vars
# that need to be known before we attempt to load the project
ctx.obj = functools.partial(
create,
project_dir=project_dir,
profiles_dir=profiles_dir,
profile=profile,
target=target,
debug=debug,
log_level=log_level,
)
# only keep track of the global options that have been explicitly set
# the subcommand handlers get invoked with the default values of the global options (even if the option is specified top level)
# so we capture any explicitly set options to use as overrides in the subcommand handlers
ctx.obj = {
k: v for k, v in kwargs.items() if ctx.get_parameter_source(k) != ParameterSource.DEFAULT
}

if not ctx.invoked_subcommand:
if profile or target:
if "profile" in ctx.obj or "target" in ctx.obj:
# trigger a project load to validate the specified profile / target
ctx.obj()
_get_dbt_operations(ctx)

click.echo(
f"No command specified. Run `{ctx.info_name} --help` to see the available commands."
)


@dbt.command()
@select_option
@model_option
@exclude_option
@resource_type_option
@click.option(
"-f",
"--full-refresh",
is_flag=True,
default=False,
help="If specified, sqlmesh will drop incremental models and fully-recalculate the incremental table from the model definition.",
)
@click.option(
"--env",
"--environment",
help="Run against a specific Virtual Data Environment (VDE) instead of the main environment",
)
@click.option(
"--empty/--no-empty", default=False, help="If specified, limit input refs and sources"
)
@click.option(
"--threads",
type=int,
help="Specify number of threads to use while executing models. Overrides settings in profiles.yml.",
)
@vars_option
@global_options
@run_options
@click.pass_context
def run(
ctx: click.Context,
vars: t.Optional[t.Dict[str, t.Any]],
threads: t.Optional[int],
env: t.Optional[str] = None,
**kwargs: t.Any,
) -> None:
"""Compile SQL and execute against the current target database."""
_get_dbt_operations(ctx, vars, threads).run(environment=env, **kwargs)
ops, remaining_kwargs = _get_dbt_operations(ctx, **kwargs)
ops.run(environment=env, **remaining_kwargs)


@dbt.command(name="list")
@select_option
@model_option
@exclude_option
@resource_type_option
@vars_option
@global_options
@list_options
@click.pass_context
def list_(ctx: click.Context, vars: t.Optional[t.Dict[str, t.Any]], **kwargs: t.Any) -> None:
def list_(ctx: click.Context, **kwargs: t.Any) -> None:
"""List the resources in your project"""
_get_dbt_operations(ctx, vars).list_(**kwargs)
ops, remaining_kwargs = _get_dbt_operations(ctx, **kwargs)
ops.list_(**remaining_kwargs)


@dbt.command(name="ls", hidden=True) # hidden alias for list
Expand Down
Loading