Skip to content

Commit

Permalink
Added validations for aactions
Browse files Browse the repository at this point in the history
  • Loading branch information
itisallgood committed Jan 29, 2025
1 parent 5e39184 commit 7759704
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 1 deletion.
17 changes: 16 additions & 1 deletion src/robusta/core/playbooks/playbooks_event_handler_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from robusta.runner.telemetry import Telemetry
from robusta.utils.error_codes import ActionException, ErrorCodes
from robusta.utils.stack_tracer import StackTracer
from robusta.core.playbooks.validators import BaseValidator, AccountTypeValidator

playbooks_errors_count = prometheus_client.Counter(
"playbooks_errors", "Number of playbooks failures.", labelnames=("source",)
Expand All @@ -37,7 +38,16 @@
class PlaybooksEventHandlerImpl(PlaybooksEventHandler):
def __init__(self, registry: Registry):
self.registry = registry

self.dal = None
self.validators: List[BaseValidator] = []

def set_dal(self, dal):
self.dal = dal
if dal:
self.validators = [
AccountTypeValidator(dal),
]

def handle_trigger(self, trigger_event: TriggerEvent) -> Optional[Dict[str, Any]]:
playbooks = self.registry.get_playbooks().get_playbooks(trigger_event)
if not playbooks: # no registered playbooks for this event type
Expand Down Expand Up @@ -237,7 +247,12 @@ def __run_playbook_actions(
execution_event.response = self.__error_resp(msg, ErrorCodes.PARAMS_INSTANTIATION_FAILED.value)
playbooks_errors_count.labels(source).inc()
continue

try:
for validator in self.validators:
if validator.is_applicable(action):
validator.validate(self.registry.get_global_config().get("account_id", ""))

if action_with_params:
registered_action.func(execution_event, params)
else:
Expand Down
86 changes: 86 additions & 0 deletions src/robusta/core/playbooks/validators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from abc import ABC, abstractmethod
from cachetools import TTLCache
from threading import Lock
from typing import Optional
import logging

from robusta.core.sinks.robusta.dal.supabase_dal import SupabaseDal
from robusta.api import ActionException, Action
from robusta.api import ErrorCodes


ACCOUNT_TYPE_CACHE_TIMEOUT = 900
ACCOUNT_CACHE_SIZE = 5000


class BaseValidator(ABC):
@abstractmethod
def validate(self, account_id: str) -> None:
"""
Validate specific condition
Raises ActionException if validation fails
"""
pass

@abstractmethod
def is_applicable(self, action: Action) -> bool:
pass


class AccountTypeValidator(BaseValidator):
APPLICABLE_ACTIONS = (
"ask_holmes",
"holmes_workload_health",
"holmes_conversation",
"delayed_health_check",
"holmes_issue_chat",
"holmes_chat"
)
def __init__(self, dal: Optional[SupabaseDal]):
self._dal = dal
self.account_cache = TTLCache(
maxsize=ACCOUNT_CACHE_SIZE,
ttl=ACCOUNT_TYPE_CACHE_TIMEOUT
)
self.account_cache_lock = Lock()

def _is_account_free(self, account_id: str) -> Optional[bool]:
try:
if account_id in self.account_cache:
return self.account_cache[account_id]

if not self._dal:
return None

is_free = self._dal.is_account_free()
if is_free is None:
return None

with self.account_cache_lock:
self.account_cache[account_id] = is_free

return is_free
except Exception:
logging.exception(
f"Exception getting account status for account_id: {account_id}",
exc_info=True
)
return None

def is_applicable(self, action: Action) -> bool:
return action.action_name in self.APPLICABLE_ACTIONS

def validate(self, account_id: str) -> None:
is_free = self._is_account_free(account_id)

if is_free is None:
raise ActionException(
ErrorCodes.HOLMES_UNEXPECTED_ERROR,
"Failed to validate account status"
)

if is_free:
raise ActionException(
ErrorCodes.HOLMES_REQUEST_ERROR,
"This feature is not available in free accounts"
)
10 changes: 10 additions & 0 deletions src/robusta/core/sinks/robusta/dal/supabase_dal.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
ACCOUNT_RESOURCE_TABLE = "AccountResource"
ACCOUNT_RESOURCE_STATUS_TABLE = "AccountResourceStatus"
OPENSHIFT_GROUPS_TABLE = "OpenshiftGroups"
ACCOUNT_BILLING_TABLE = "AccountBilling"


class SupabaseDal(AccountResourceFetcher):
Expand Down Expand Up @@ -753,3 +754,12 @@ def set_cluster_active(self, active: bool) -> None:
)
except Exception as e:
logging.error(f"Failed to set cluster status active=False error: {e}")

def is_account_free(self) -> bool:
try:
res = self.client.table(ACCOUNT_BILLING_TABLE).select(
"plan").eq("account_id", self.account_id).execute()
except Exception as e:
logging.error(f"Failed to check if account is free error: {e}")
raise
return res.data[0]["plan"] == "free"
13 changes: 13 additions & 0 deletions src/robusta/runner/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,21 @@ def main():
registry = Registry()
event_handler = PlaybooksEventHandlerImpl(registry)
loader = ConfigLoader(registry, event_handler)

# Initialize database access layer (dal) for AI features
# Note: This must be done after ConfigLoader initialization because:
# 1. RobustaSink (which contains dal) is fully initialized during ConfigLoader creation
# 2. AI features (like ask_holmes) are only available when robusta_ui_sink is configured
# 3. We want to avoid accessing sink registry repeatedly during action execution
sink_registry = registry.get_sinks()
ui_sink_enabled = "robusta_ui_sink" in sink_registry.get_all()
if ui_sink_enabled:
robusta_sink = sink_registry.get_sink_by_name("robusta_ui_sink")
dal = getattr(robusta_sink, 'dal', None)
if not dal:
logging.error("Robusta UI sink found but database access is not properly configured")
event_handler.set_dal(dal)

if ui_sink_enabled or ENABLE_TELEMETRY:
if not ENABLE_TELEMETRY:
logging.warning("Telemetry could not be disabled when Robusta UI is used.")
Expand Down

0 comments on commit 7759704

Please sign in to comment.