Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/grafana): Add datasets and charts to dashboards with lineage and tags. Lineage back to source #12417

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/grafana/field_utils.py
acrylJonny marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import logging
from typing import Any, Dict, List, Union

from datahub.ingestion.source.grafana.models import Panel
from datahub.metadata.schema_classes import (
NumberTypeClass,
SchemaFieldClass,
SchemaFieldDataTypeClass,
StringTypeClass,
TimeTypeClass,
)

logger = logging.getLogger(__name__)


def _deduplicate_fields(fields: List[SchemaFieldClass]) -> List[SchemaFieldClass]:
"""Remove duplicate fields based on fieldPath while preserving order."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this in the codebase — does it fit the purpose?

def deduplicate_list(iterable: Iterable[_T]) -> List[_T]:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gave it a try and unfortunately it doesn't do the business. I gave it a go and it was causing hashing issues.

unique_fields = {field.fieldPath: field for field in fields}
return list(unique_fields.values())


def extract_sql_column_fields(target: Dict[str, Any]) -> List[SchemaFieldClass]:
"""Extract fields from SQL-style columns."""
fields = []
for col in target.get("sql", {}).get("columns", []):
for param in col.get("parameters", []):
if param.get("type") == "column" and param.get("name"):
field_type: Union[NumberTypeClass, StringTypeClass, TimeTypeClass] = (
TimeTypeClass()
if col["type"] == "time"
else NumberTypeClass()
if col["type"] == "number"
else StringTypeClass()
)
fields.append(
SchemaFieldClass(
fieldPath=param["name"],
type=SchemaFieldDataTypeClass(type=field_type),
nativeDataType=col["type"],
)
)
return fields


def extract_prometheus_fields(target: Dict[str, Any]) -> List[SchemaFieldClass]:
"""Extract fields from Prometheus expressions."""
expr = target.get("expr")
if expr:
legend = target.get("legendFormat", expr)
return [
SchemaFieldClass(
fieldPath=legend,
type=SchemaFieldDataTypeClass(type=NumberTypeClass()),
nativeDataType="prometheus_metric",
)
]
return []


def extract_raw_sql_fields(target: Dict[str, Any]) -> List[SchemaFieldClass]:
"""Extract fields from raw SQL queries using SQL parsing."""
raw_sql = target.get("rawSql", "").lower()
if not raw_sql:
return []

try:
sql = raw_sql.lower()
select_start = sql.index("select") + 6 # len("select")
from_start = sql.index("from")
select_part = sql[select_start:from_start].strip()

columns = [col.strip().split()[-1].strip() for col in select_part.split(",")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming that split by , will actually split by columns may be wrong assumption, eg: SELECT CONCAT(first_name, ' ', last_name) AS full_name FROM users
Not sure if we need to address this complexity though. WDYT?


return [
(
SchemaFieldClass(
# Capture the alias of the column if present or the name of the field
fieldPath=col.split(" as ")[-1].strip('"').strip("'"),
acrylJonny marked this conversation as resolved.
Show resolved Hide resolved
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
nativeDataType="sql_column",
)
)
for col in columns
]
except (IndexError, ValueError, StopIteration):
logger.warning(f"Failed to parse SQL {target.get('rawSql')}")
return []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we log some warning here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've put this in logger but were you looking for this to be in the report?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger should be fine
given we are catching three types of exceptions, adding the exception to the log trace may help to discriminate the issue

        logger.warning(f"Failed to parse SQL {target.get('rawSql')}", ex)



def extract_fields_from_panel(panel: Panel) -> List[SchemaFieldClass]:
"""Extract all fields from a panel."""
fields = []
fields.extend(extract_fields_from_targets(panel.targets))
fields.extend(get_fields_from_field_config(panel.field_config))
fields.extend(get_fields_from_transformations(panel.transformations))
return _deduplicate_fields(fields)


def extract_fields_from_targets(
targets: List[Dict[str, Any]],
) -> List[SchemaFieldClass]:
"""Extract fields from panel targets."""
fields = []
for target in targets:
fields.extend(extract_sql_column_fields(target))
fields.extend(extract_prometheus_fields(target))
fields.extend(extract_raw_sql_fields(target))
fields.extend(extract_time_format_fields(target))
return fields


def extract_time_format_fields(target: Dict[str, Any]) -> List[SchemaFieldClass]:
"""Extract fields from time series and table formats."""
if target.get("format") in {"time_series", "table"}:
return [
SchemaFieldClass(
fieldPath="time",
type=SchemaFieldDataTypeClass(type=TimeTypeClass()),
nativeDataType="timestamp",
)
]
return []


def get_fields_from_field_config(
field_config: Dict[str, Any],
) -> List[SchemaFieldClass]:
"""Extract fields from field configuration."""
fields = []
defaults = field_config.get("defaults", {})
unit = defaults.get("unit")
if unit:
fields.append(
SchemaFieldClass(
fieldPath=f"value_{unit}",
type=SchemaFieldDataTypeClass(type=NumberTypeClass()),
nativeDataType="value",
)
)
for override in field_config.get("overrides", []):
if override.get("matcher", {}).get("id") == "byName":
field_name = override.get("matcher", {}).get("options")
if field_name:
fields.append(
SchemaFieldClass(
fieldPath=field_name,
type=SchemaFieldDataTypeClass(type=NumberTypeClass()),
nativeDataType="metric",
)
)
return fields


def get_fields_from_transformations(
transformations: List[Dict[str, Any]],
) -> List[SchemaFieldClass]:
"""Extract fields from transformations."""
fields = []
for transform in transformations:
if transform.get("type") == "organize":
for field_name in transform.get("options", {}).get("indexByName", {}):
fields.append(
SchemaFieldClass(
fieldPath=field_name,
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
nativeDataType="transformed",
)
)
return fields
129 changes: 129 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py
acrylJonny marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""API client for Grafana metadata extraction"""

import logging
from typing import Dict, List, Optional, Union

import requests
import urllib3.exceptions
from pydantic import SecretStr

from datahub.ingestion.source.grafana.models import Dashboard, Folder
from datahub.ingestion.source.grafana.report import GrafanaSourceReport

logger = logging.getLogger(__name__)


class GrafanaAPIClient:
"""Client for making requests to Grafana API"""

def __init__(
self,
base_url: str,
token: SecretStr,
verify_ssl: bool,
report: GrafanaSourceReport,
) -> None:
self.base_url = base_url
self.verify_ssl = verify_ssl
self.report = report
self.session = self._create_session(token)

def _create_session(self, token: SecretStr) -> requests.Session:
session = requests.Session()
session.headers.update(
{
"Authorization": f"Bearer {token.get_secret_value()}",
"Accept": "application/json",
"Content-Type": "application/json",
}
)
session.verify = self.verify_ssl

# If SSL verification is disabled, suppress the warnings
if not self.verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.report.warning("SSL Verification is recommended.")

Check warning on line 45 in metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py#L44-L45

Added lines #L44 - L45 were not covered by tests

return session

def get_folders(self) -> List[Folder]:
"""Fetch all folders from Grafana with pagination."""
folders: List[Folder] = []
page = 1
per_page = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the experience with other sources, it's nice to have this as a config parameter with a default value (page_size).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to just specify the page size to reduce the overhead of anyone needing to think about changing this, but happy to change.


while True:
try:
response = self.session.get(
f"{self.base_url}/api/folders",
params={"page": page, "limit": per_page},
)
response.raise_for_status()

batch = response.json()
if not batch:
break

folders.extend(Folder.parse_obj(folder) for folder in batch)
page += 1
except requests.exceptions.RequestException as e:
self.report.report_failure(
message="Failed to fetch folders on page",
context=str(page),
exc=e,
)
break

return folders

def get_dashboard(self, uid: str) -> Optional[Dashboard]:
"""Fetch a specific dashboard by UID"""
try:
response = self.session.get(f"{self.base_url}/api/dashboards/uid/{uid}")
response.raise_for_status()
return Dashboard.parse_obj(response.json())
except requests.exceptions.RequestException as e:
self.report.warning(
message="Failed to fetch dashboard",
context=uid,
exc=e,
)
return None

def get_dashboards(self) -> List[Dashboard]:
"""Fetch all dashboards from search endpoint with pagination."""
dashboards: List[Dashboard] = []
page = 1
per_page = 100

while True:
try:
params: Dict[str, Union[str, int]] = {
"type": "dash-db",
"page": page,
"limit": per_page,
}
response = self.session.get(
f"{self.base_url}/api/search",
params=params,
)
response.raise_for_status()

batch = response.json()
if not batch:
break

for result in batch:
dashboard = self.get_dashboard(result["uid"])
if dashboard:
dashboards.append(dashboard)
page += 1
except requests.exceptions.RequestException as e:
self.report.report_failure(
message="Failed to fetch dashboards on page",
context=str(page),
exc=e,
)
break

return dashboards
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import Dict, Optional

from pydantic import Field, SecretStr, validator

from datahub.configuration.source_common import (
DatasetLineageProviderConfigBase,
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.utilities import config_clean


class PlatformConnectionConfig(
EnvConfigMixin,
PlatformInstanceConfigMixin,
):
platform: str = Field(description="Upstream platform code (e.g. postgres, ms-sql)")
database: Optional[str] = Field(default=None, description="Database name")
database_schema: Optional[str] = Field(default=None, description="Schema name")


class GrafanaSourceConfig(
DatasetLineageProviderConfigBase,
StatefulIngestionConfigBase,
EnvConfigMixin,
PlatformInstanceConfigMixin,
):
platform: str = Field(default="grafana", hidden_from_docs=True)
url: str = Field(
description="URL of Grafana instance (e.g. https://grafana.company.com)"
)
service_account_token: SecretStr = Field(description="Grafana API token")
verify_ssl: bool = Field(
default=True,
description="Verify SSL certificate for secure connections (https)",
)
ingest_tags: bool = Field(
default=True,
description="Whether to ingest tags from Grafana dashboards and charts",
)
ingest_owners: bool = Field(
default=True,
description="Whether to ingest owners from Grafana dashboards and charts",
)
connection_to_platform_map: Dict[str, PlatformConnectionConfig] = Field(
default={},
description="Map of Grafana connection names to their upstream platform details",
)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None

@validator("url", allow_reuse=True)
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)
Loading
Loading