Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def __init__(self, broker_settings=None, **kwargs):
# if a nick was specified, pull in the resolved arguments
if "nick" in kwargs:
nick = kwargs.pop("nick")
kwargs = helpers.merge_dicts(kwargs, helpers.resolve_nick(nick, self._settings))
# Merge nick resolution with kwargs; kwargs takes precedence over nick values
kwargs = helpers.merge_dicts(helpers.resolve_nick(nick, self._settings), kwargs)
logger.debug(f"kwargs after nick resolution {kwargs=}")
# Allow users to more simply pass a host class instead of a dict
if "host_class" in kwargs:
Expand Down
162 changes: 159 additions & 3 deletions broker/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@
click.rich_click.COMMAND_GROUPS = {
"broker": [
{"name": "Core Actions", "commands": ["checkout", "checkin", "inventory"]},
{"name": "Extras", "commands": ["execute", "extend", "providers", "config", "shell"]},
{
"name": "Extras",
"commands": ["execute", "extend", "providers", "config", "scenarios", "shell"],
},
]
}

Expand All @@ -66,11 +69,17 @@ def wrapper(*args, **kwargs):
helpers.emit(return_code=0)
return retval
except Exception as err: # noqa: BLE001 -- we want to catch all exceptions
if not isinstance(err, exceptions.BrokerError):
if isinstance(err, exceptions.ScenarioError):
# Show full message for scenario errors since context is important
logger.error(f"Scenario failed: {err.message}")
CONSOLE.print(f"[red]Scenario failed:[/red] {err.message}")
elif not isinstance(err, exceptions.BrokerError):
err = exceptions.BrokerError(err)
logger.error(f"Command failed: {err.message}")
CONSOLE.print(f"[red]Command failed:[/red] {err.message}")
else: # BrokerError children already log their messages
logger.error(f"Command failed due to: {type(err).__name__}")
CONSOLE.print(f"[red]Command failed due to:[/red] {type(err).__name__}")
helpers.emit(return_code=err.error_code, error_message=str(err.message))
sys.exit(err.error_code)

Expand Down Expand Up @@ -217,6 +226,7 @@ def cli(version):
table.add_column("Location", justify="left", style="magenta")

table.add_row("Broker Directory", str(settings.BROKER_DIRECTORY.absolute()))
table.add_row("Scenarios Directory", f"{settings.BROKER_DIRECTORY.absolute()}/scenarios")
table.add_row("Settings File", str(settings.settings_path.absolute()))
table.add_row("Inventory File", f"{settings.BROKER_DIRECTORY.absolute()}/inventory.yaml")
table.add_row("Log File", f"{settings.BROKER_DIRECTORY.absolute()}/logs/broker.log")
Expand Down Expand Up @@ -312,7 +322,13 @@ def checkin(hosts, background, all_, sequential, filter):
unmatched.discard(host.get("name"))

if unmatched:
logger.warning(f"The following hosts were not found in inventory: {', '.join(unmatched)}")
logger.warning(
"The following hosts were not found in inventory: %s",
", ".join(unmatched),
)
CONSOLE.print(
f"[yellow]Warning:[/yellow] The following hosts were not found in inventory: {', '.join(unmatched)}"
)
if to_remove:
Broker(hosts=to_remove).checkin(sequential=sequential)

Expand Down Expand Up @@ -452,6 +468,7 @@ def execute(ctx, background, nick, output_format, artifacts, args_file, provider
click.echo(result)
elif output_format == "log":
logger.info(result)
CONSOLE.print(result)
elif output_format == "yaml":
click.echo(helpers.yaml_format(result))

Expand Down Expand Up @@ -564,8 +581,146 @@ def validate(chunk):
try:
ConfigManager(settings.settings_path).validate(chunk, PROVIDERS)
logger.info("Validation passed!")
CONSOLE.print("[green]Validation passed![/green]")
except exceptions.BrokerError as err:
logger.warning(f"Validation failed: {err}")
CONSOLE.print(f"[yellow]Validation failed:[/yellow] {err}")


# --- Scenarios CLI Group ---


@cli.group()
def scenarios():
"""Manage and execute Broker scenarios.

Scenarios allow you to chain multiple Broker actions together in a YAML file.
"""
pass


@guarded_command(group=scenarios, name="list")
def scenarios_list():
"""List all available scenarios in the scenarios directory."""
from broker.scenarios import SCENARIOS_DIR, list_scenarios

scenario_names = list_scenarios()
if not scenario_names:
CONSOLE.print(f"No scenarios found in {SCENARIOS_DIR}")
return

table = Table(title="Available Scenarios")
table.add_column("Name", style="cyan")

for name in scenario_names:
table.add_row(name)

CONSOLE.print(table)


@guarded_command(
group=scenarios,
name="execute",
context_settings={"allow_extra_args": True, "ignore_unknown_options": True},
)
@click.argument("scenario", type=str)
@click.option("-b", "--background", is_flag=True, help="Run scenario in the background")
@click.pass_context
def scenarios_execute(ctx, scenario, background):
"""Execute a scenario file.

SCENARIO can be a name (found in scenarios dir) or a path to a YAML file.

Additional arguments are passed as variable overrides:

broker scenarios execute my_scenario --MY_VAR value --ANOTHER_VAR value

Config overrides use dotted notation:

broker scenarios execute my_scenario --config.settings.ssh.backend paramiko
"""
from broker.scenarios import ScenarioRunner, find_scenario

# Parse CLI args into variables and config overrides
cli_vars = {}
cli_config = {}
extra_args = helpers.kwargs_from_click_ctx(ctx)

for key, val in extra_args.items():
if key.startswith("config."):
cli_config[key] = val
else:
cli_vars[key] = val

if background:
helpers.fork_broker()

scenario_path = find_scenario(scenario)
runner = ScenarioRunner(
scenario_path=scenario_path,
cli_vars=cli_vars,
cli_config=cli_config,
)
try:
runner.run()
finally:
# Display scenario inventory if any hosts remain
if runner.scenario_inventory:
inv_data = [host.to_dict() for host in runner.scenario_inventory]
curated_host_info = [
helpers.inventory_fields_to_dict(
inventory_fields=settings.settings.inventory_fields,
host_dict=host,
provider_actions=PROVIDER_ACTIONS,
)
for host in inv_data
]
table = helpers.dictlist_to_table(
curated_host_info, "Scenario Inventory (hosts still checked out)", _id=True
)
CONSOLE.print(table)
CONSOLE.print(f"[dim]Inventory file: {runner.inventory_path}[/dim]")


@guarded_command(group=scenarios)
@click.argument("scenario", type=str)
@click.option("--no-syntax", is_flag=True, help="Disable syntax highlighting")
def info(scenario, no_syntax):
"""Get information about a scenario.

Displays the scenario's config, variables, and step names.
"""
from broker.scenarios import ScenarioRunner, find_scenario

scenario_path = find_scenario(scenario)
runner = ScenarioRunner(scenario_path=scenario_path)
info_data = runner.get_info()

output = helpers.yaml_format(info_data)
if no_syntax:
CONSOLE.print(output)
else:
CONSOLE.print(Syntax(output, "yaml", background_color="default"))


@guarded_command(group=scenarios, name="validate")
@click.argument("scenario", type=str)
def scenarios_validate(scenario):
"""Validate a scenario file against the schema.

Checks for syntax errors and schema violations.
"""
from broker.scenarios import find_scenario, validate_scenario

scenario_path = find_scenario(scenario)
is_valid, error_msg = validate_scenario(scenario_path)

if is_valid:
CONSOLE.print(f"[green]Scenario '{scenario}' is valid![/green]")
if error_msg: # Schema not found message
CONSOLE.print(f"[yellow]Warning:[/yellow] {error_msg}")
else:
CONSOLE.print(f"[red]Scenario '{scenario}' is invalid:[/red] {error_msg}")


def _make_shell_help_func(cmd, shell_instance):
Expand Down Expand Up @@ -601,6 +756,7 @@ def broker_shell():
broker_shell.add_command(execute)
broker_shell.add_command(providers)
broker_shell.add_command(config)
broker_shell.add_command(scenarios)


# Shell-only commands (not available as normal sub-commands)
Expand Down
31 changes: 31 additions & 0 deletions broker/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,34 @@ class ParamikoBindError(BrokerError):
"""Raised when a problem occurs at the Paramiko bind level."""

error_code = 15


class ScenarioError(BrokerError):
"""Raised when a problem occurs during scenario execution."""

error_code = 16

def __init__(self, message="Scenario execution failed", step_name=None, scenario_name=None):
"""Initialize ScenarioError with optional context.

Args:
message: The error message
step_name: Name of the step where the error occurred (optional)
scenario_name: Name of the scenario being executed (optional)
"""
self.step_name = step_name
self.scenario_name = scenario_name

# Build a contextual message
parts = []
if scenario_name:
parts.append(f"Scenario '{scenario_name}'")
if step_name:
parts.append(f"step '{step_name}'")

if parts:
self.message = f"{' '.join(parts)}: {message}"
else:
self.message = message

super().__init__(message=self.message)
2 changes: 2 additions & 0 deletions broker/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
data_to_tempfile,
load_file,
resolve_file_args,
save_file,
temporary_tar,
yaml,
yaml_format,
Expand Down Expand Up @@ -92,6 +93,7 @@
"merge_dicts",
"resolve_file_args",
"resolve_nick",
"save_file",
"set_emit_file",
"simple_retry",
"temporary_tar",
Expand Down
4 changes: 3 additions & 1 deletion broker/helpers/dict_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ def clean_dict(in_dict):
def merge_dicts(dict1, dict2):
"""Merge two nested dictionaries together.

Values from dict2 take precedence over dict1 for duplicate keys.

:return: merged dictionary
"""
if not isinstance(dict1, MutableMapping) or not isinstance(dict2, MutableMapping):
return dict1
return dict2 if dict2 is not None else dict1
dict1 = clean_dict(dict1)
dict2 = clean_dict(dict2)
merged = {}
Expand Down
71 changes: 70 additions & 1 deletion broker/helpers/file_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""File handling utilities."""

import contextlib
from contextlib import contextmanager
from io import BytesIO
import json
Expand Down Expand Up @@ -32,6 +33,74 @@ def load_file(file, warn=True):
return yaml.load(file)


def save_file(file, data, mode="overwrite"):
"""Save data to a file, using appropriate format based on file extension.

Args:
file: Path to the file (string or Path object)
data: The data to save. Can be dict, list, or string.
mode: Write mode - "overwrite" (default) or "append"

Returns:
Path object to the saved file

The format is determined by the file extension:
- .json: Save as JSON with indentation
- .yaml/.yml: Save as YAML
- Other: Save as plain text (string conversion if needed)
"""
file = Path(file)
file.parent.mkdir(parents=True, exist_ok=True)

# Determine format based on extension
suffix = file.suffix.lower()

if suffix == ".json":
# For JSON, try to parse string data as JSON first
if isinstance(data, str):
with contextlib.suppress(json.JSONDecodeError):
data = json.loads(data)
content = json.dumps(data, indent=2, default=str)
elif suffix in (".yaml", ".yml"):
# For YAML, try to parse string data as structured data first
if isinstance(data, str):
data = _try_parse_structured_string(data)
output = BytesIO()
yaml.dump(data, output)
content = output.getvalue().decode("utf-8")
# Plain text - convert to string if needed
elif isinstance(data, (dict, list)):
content = yaml_format(data)
else:
content = str(data)

# Write the content
if mode == "append":
with file.open("a") as f:
f.write(content)
if not content.endswith("\n"):
f.write("\n")
else:
file.write_text(content)
if not content.endswith("\n"):
with file.open("a") as f:
f.write("\n")

logger.debug(f"Saved data to file: {file.absolute()}")
return file


def _try_parse_structured_string(data):
"""Try to parse a string as JSON or YAML, returning original if parsing fails."""
# Try JSON first
with contextlib.suppress(json.JSONDecodeError):
return json.loads(data)
# Then try YAML
with contextlib.suppress(Exception):
return yaml.load(data)
return data


def resolve_file_args(broker_args):
"""Check for files being passed in as values to arguments then attempt to resolve them.

Expand Down Expand Up @@ -144,7 +213,7 @@ def data_to_tempfile(data, path=None, as_tar=False):
@contextmanager
def temporary_tar(paths):
"""Create a temporary tar file and return the path."""
temp_tar = Path(f"{uuid4().hex[-10]}.tar")
temp_tar = Path(f"{uuid4().hex[-10:]}.tar")
with tarfile.open(temp_tar, mode="w") as tar:
for path in paths:
logger.debug(f"Adding {path.absolute()} to {temp_tar.absolute()}")
Expand Down
3 changes: 2 additions & 1 deletion broker/helpers/inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ def update_inventory(add=None, remove=None):
"name"
) == new_host.get("name"):
# update missing data in the new_host with the old_host data
new_host.update(merge_dicts(new_host, host))
# new_host values take precedence over old host data
new_host.update(merge_dicts(host, new_host))
inv_data.remove(host)
if add:
inv_data.extend(add)
Expand Down
Loading