Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/grafana): Add datasets and charts to dashboards with lineage and tags. Lineage back to source #12417

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@
# https://stackoverflow.com/questions/40845304/runtimewarning-numpy-dtype-size-changed-may-indicate-binary-incompatibility
"numpy<2",
},
"grafana": {"requests"},
"grafana": {"requests", "sqlparse"},
"glue": aws_common,
# hdbcli is supported officially by SAP, sqlalchemy-hana is built on top but not officially supported
"hana": sql_common
Expand Down
167 changes: 167 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/grafana/field_utils.py
acrylJonny marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from typing import Any, Dict, List, Union

import sqlparse
acrylJonny marked this conversation as resolved.
Show resolved Hide resolved

from datahub.ingestion.source.grafana.models import Panel
from datahub.metadata.schema_classes import (
NumberTypeClass,
SchemaFieldClass,
SchemaFieldDataTypeClass,
StringTypeClass,
TimeTypeClass,
)


def _deduplicate_fields(fields: List[SchemaFieldClass]) -> List[SchemaFieldClass]:
"""Remove duplicate fields based on fieldPath while preserving order."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this in the codebase — does it fit the purpose?

def deduplicate_list(iterable: Iterable[_T]) -> List[_T]:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gave it a try and unfortunately it doesn't do the business. I gave it a go and it was causing hashing issues.

unique_fields = {field.fieldPath: field for field in fields}
return list(unique_fields.values())


def extract_sql_column_fields(target: Dict[str, Any]) -> List[SchemaFieldClass]:
"""Extract fields from SQL-style columns."""
fields = []
for col in target.get("sql", {}).get("columns", []):
for param in col.get("parameters", []):
if param.get("type") == "column" and param.get("name"):
field_type: Union[NumberTypeClass, StringTypeClass, TimeTypeClass] = (
TimeTypeClass()
if col["type"] == "time"
else NumberTypeClass()
if col["type"] == "number"
else StringTypeClass()
)
fields.append(
SchemaFieldClass(
fieldPath=param["name"],
type=SchemaFieldDataTypeClass(type=field_type),
nativeDataType=col["type"],
)
)
return fields


def extract_prometheus_fields(target: Dict[str, Any]) -> List[SchemaFieldClass]:
"""Extract fields from Prometheus expressions."""
expr = target.get("expr")
if expr:
legend = target.get("legendFormat", expr)
return [
SchemaFieldClass(
fieldPath=legend,
type=SchemaFieldDataTypeClass(type=NumberTypeClass()),
nativeDataType="prometheus_metric",
)
]
return []


def extract_raw_sql_fields(target: Dict[str, Any]) -> List[SchemaFieldClass]:
"""Extract fields from raw SQL queries using SQL parsing."""
raw_sql = target.get("rawSql", "").lower()
if not raw_sql:
return []

try:
parsed = sqlparse.parse(raw_sql)[0] # Parse the SQL query
acrylJonny marked this conversation as resolved.
Show resolved Hide resolved
select_token = next(
token
for token in parsed.tokens
if token.ttype is None and token.value.lower().startswith("select")
)
columns = [col.strip() for col in select_token.value.split(",")]

Check warning on line 72 in metadata-ingestion/src/datahub/ingestion/source/grafana/field_utils.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/grafana/field_utils.py#L72

Added line #L72 was not covered by tests

return [

Check warning on line 74 in metadata-ingestion/src/datahub/ingestion/source/grafana/field_utils.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/grafana/field_utils.py#L74

Added line #L74 was not covered by tests
(
SchemaFieldClass(
fieldPath=col.split(" as ")[-1].strip('"').strip("'"),
acrylJonny marked this conversation as resolved.
Show resolved Hide resolved
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
nativeDataType="sql_column",
)
)
for col in columns
]
except (IndexError, ValueError, StopIteration):
return []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we log some warning here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've put this in logger but were you looking for this to be in the report?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger should be fine
given we are catching three types of exceptions, adding the exception to the log trace may help to discriminate the issue

        logger.warning(f"Failed to parse SQL {target.get('rawSql')}", ex)



def extract_fields_from_panel(panel: Panel) -> List[SchemaFieldClass]:
"""Extract all fields from a panel."""
fields = []
fields.extend(extract_fields_from_targets(panel.targets))
fields.extend(get_fields_from_field_config(panel.field_config))
fields.extend(get_fields_from_transformations(panel.transformations))
return _deduplicate_fields(fields)


def extract_fields_from_targets(
targets: List[Dict[str, Any]],
) -> List[SchemaFieldClass]:
"""Extract fields from panel targets."""
fields = []
for target in targets:
fields.extend(extract_sql_column_fields(target))
fields.extend(extract_prometheus_fields(target))
fields.extend(extract_raw_sql_fields(target))
fields.extend(extract_time_format_fields(target))
return fields


def extract_time_format_fields(target: Dict[str, Any]) -> List[SchemaFieldClass]:
"""Extract fields from time series and table formats."""
if target.get("format") in {"time_series", "table"}:
return [
SchemaFieldClass(
fieldPath="time",
type=SchemaFieldDataTypeClass(type=TimeTypeClass()),
nativeDataType="timestamp",
)
]
return []


def get_fields_from_field_config(
field_config: Dict[str, Any],
) -> List[SchemaFieldClass]:
"""Extract fields from field configuration."""
fields = []
defaults = field_config.get("defaults", {})
unit = defaults.get("unit")
if unit:
fields.append(
SchemaFieldClass(
fieldPath=f"value_{unit}",
type=SchemaFieldDataTypeClass(type=NumberTypeClass()),
nativeDataType="value",
)
)
for override in field_config.get("overrides", []):
if override.get("matcher", {}).get("id") == "byName":
field_name = override.get("matcher", {}).get("options")
if field_name:
fields.append(
SchemaFieldClass(
fieldPath=field_name,
type=SchemaFieldDataTypeClass(type=NumberTypeClass()),
nativeDataType="metric",
)
)
return fields


def get_fields_from_transformations(
transformations: List[Dict[str, Any]],
) -> List[SchemaFieldClass]:
"""Extract fields from transformations."""
fields = []
for transform in transformations:
if transform.get("type") == "organize":
for field_name in transform.get("options", {}).get("indexByName", {}):
fields.append(

Check warning on line 160 in metadata-ingestion/src/datahub/ingestion/source/grafana/field_utils.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/grafana/field_utils.py#L158-L160

Added lines #L158 - L160 were not covered by tests
SchemaFieldClass(
fieldPath=field_name,
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
nativeDataType="transformed",
)
)
return fields
129 changes: 129 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py
acrylJonny marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""API client for Grafana metadata extraction"""

import logging
from typing import Dict, List, Optional, Union

import requests
import urllib3.exceptions
from pydantic import SecretStr

from datahub.ingestion.source.grafana.models import Dashboard, Folder
from datahub.ingestion.source.grafana.report import GrafanaSourceReport

logger = logging.getLogger(__name__)


class GrafanaAPIClient:
"""Client for making requests to Grafana API"""

def __init__(
self,
base_url: str,
token: SecretStr,
verify_ssl: bool,
report: GrafanaSourceReport,
) -> None:
self.base_url = base_url
self.verify_ssl = verify_ssl
self.report = report
self.session = self._create_session(token)

def _create_session(self, token: SecretStr) -> requests.Session:
session = requests.Session()
session.headers.update(
{
"Authorization": f"Bearer {token.get_secret_value()}",
"Accept": "application/json",
"Content-Type": "application/json",
}
)
session.verify = self.verify_ssl

# If SSL verification is disabled, suppress the warnings
if not self.verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.report.warning("SSL Verification is recommended.")

Check warning on line 45 in metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py#L44-L45

Added lines #L44 - L45 were not covered by tests

return session

def get_folders(self) -> List[Folder]:
"""Fetch all folders from Grafana with pagination."""
folders: List[Folder] = []
page = 1
per_page = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the experience with other sources, it's nice to have this as a config parameter with a default value (page_size).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to just specify the page size to reduce the overhead of anyone needing to think about changing this, but happy to change.


while True:
try:
response = self.session.get(
f"{self.base_url}/api/folders",
params={"page": page, "limit": per_page},
)
response.raise_for_status()

batch = response.json()
if not batch:
break

folders.extend(Folder.parse_obj(folder) for folder in batch)
page += 1
except requests.exceptions.RequestException as e:
self.report.report_failure(

Check warning on line 70 in metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py#L67-L70

Added lines #L67 - L70 were not covered by tests
message="Failed to fetch folders on page",
context=str(page),
exc=e,
)
break

Check warning on line 75 in metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py#L75

Added line #L75 was not covered by tests

return folders

def get_dashboard(self, uid: str) -> Optional[Dashboard]:
"""Fetch a specific dashboard by UID"""
try:
response = self.session.get(f"{self.base_url}/api/dashboards/uid/{uid}")
response.raise_for_status()
return Dashboard.parse_obj(response.json())
except requests.exceptions.RequestException as e:
self.report.warning(

Check warning on line 86 in metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py#L85-L86

Added lines #L85 - L86 were not covered by tests
message="Failed to fetch dashboard",
context=uid,
exc=e,
)
return None

Check warning on line 91 in metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py#L91

Added line #L91 was not covered by tests

def get_dashboards(self) -> List[Dashboard]:
"""Fetch all dashboards from search endpoint with pagination."""
dashboards: List[Dashboard] = []
page = 1
per_page = 100

while True:
try:
params: Dict[str, Union[str, int]] = {
"type": "dash-db",
"page": page,
"limit": per_page,
}
response = self.session.get(
f"{self.base_url}/api/search",
params=params,
)
response.raise_for_status()

batch = response.json()
if not batch:
break

for result in batch:
dashboard = self.get_dashboard(result["uid"])
if dashboard:
dashboards.append(dashboard)
page += 1
except requests.exceptions.RequestException as e:
self.report.report_failure(

Check warning on line 122 in metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py#L121-L122

Added lines #L121 - L122 were not covered by tests
message="Failed to fetch dashboards on page",
context=str(page),
exc=e,
)
break

Check warning on line 127 in metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py#L127

Added line #L127 was not covered by tests

return dashboards
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import Dict, Optional

from pydantic import Field, SecretStr, validator

from datahub.configuration.common import ConfigModel
from datahub.configuration.source_common import DatasetLineageProviderConfigBase
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.utilities import config_clean


class PlatformConnectionConfig(ConfigModel):
platform: str = Field(description="Platform to connect to")
database: Optional[str] = Field(default=None, description="Database name")
database_schema: Optional[str] = Field(default=None, description="Schema name")
platform_instance: Optional[str] = Field(
default=None, description="Platform instance"
)
env: str = Field(default="PROD", description="Environment")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two may be removed if using PlatformInstanceConfigMixin and EnvConfigMixin in GrafanaSourceConfig, are they really needed at PlatformConnectionConfig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the to use the parent class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

platform_instance and env still there as well as platform_instance in GrafanaSourceConfig 🤔

what parent class do you refer to?



class GrafanaSourceConfig(
StatefulIngestionConfigBase, DatasetLineageProviderConfigBase
):
platform: str = Field(default="grafana", hidden_from_docs=True)
url: str = Field(
description="URL of Grafana instance (e.g. https://grafana.company.com)"
)
service_account_token: SecretStr = Field(description="Grafana API token")
verify_ssl: bool = Field(
default=True,
description="Verify SSL certificate for secure connections (https)",
)
ingest_tags: bool = Field(
default=True,
description="Whether to ingest tags from Grafana dashboards and charts",
)
ingest_owners: bool = Field(
default=True,
description="Whether to ingest owners from Grafana dashboards and charts",
)
platform_instance: Optional[str] = Field(
default=None, description="Platform instance for DataHub"
)
connection_to_platform_map: Dict[str, PlatformConnectionConfig] = Field(
default={},
description="Map of Grafana connection names to their upstream platform details",
)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None

@validator("url")
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)
Loading
Loading