From 50f20c4a621ff0f8b563c228318134d6d5cb2e9c Mon Sep 17 00:00:00 2001 From: Ying Zhu Date: Fri, 12 Dec 2025 18:33:41 +0800 Subject: [PATCH 1/3] feat(observability): job-level tokio duration WIP Signed-off-by: Ying Zhu --- .../dashboard/dev/streaming_actors_tokio.py | 36 ++++++++++++++----- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/grafana/dashboard/dev/streaming_actors_tokio.py b/grafana/dashboard/dev/streaming_actors_tokio.py index c900325965963..3f24c070589c0 100644 --- a/grafana/dashboard/dev/streaming_actors_tokio.py +++ b/grafana/dashboard/dev/streaming_actors_tokio.py @@ -1,7 +1,22 @@ from ..common import * from . import section +def _sum_fragment_metric_by_mv(expr: str) -> str: + return ( + f"sum(({expr})" + f" * on(fragment_id) group_left(materialized_view_id)" + f" max by (fragment_id, materialized_view_id) ({metric('table_info')}))" + f" by (materialized_view_id)" + f" / sum(sum({metric('stream_actor_count')}) by (fragment_id)" + f" * on(fragment_id) group_left(materialized_view_id) table_info)" + f" by (materialized_view_id)" + ) +def _sum_actors_by_mv(expr: str) -> str: + return ( + f" sum(sum(stream_actor_count) by (fragment_id)" + f" " + ) @section def _(outer_panels: Panels): panels = outer_panels.sub_panel() @@ -14,10 +29,9 @@ def _(outer_panels: Panels): "This can be used to estimate the CPU usage of an actor", [ panels.target( - f"sum(rate({metric('stream_actor_poll_duration')}[$__rate_interval])) by (fragment_id)" - f"/ on(fragment_id) sum({metric('stream_actor_count')}) by (fragment_id)" + f"{_sum_fragment_metric_by_mv(f'sum(rate({metric('stream_actor_poll_duration')}[$__rate_interval])) by (fragment_id)')}" f" / 1000000000", - "fragment {{fragment_id}}", + "job {{materialized_view_id}}", ), ], ), @@ -26,19 +40,23 @@ def _(outer_panels: Panels): "", [ panels.target( - f"rate({metric('stream_actor_poll_cnt')}[$__rate_interval]) > 0", - "fragment {{fragment_id}}", - ), + _sum_fragment_metric_by_mv(f'rate({metric('stream_actor_poll_cnt')}[$__rate_interval]) > 0'), + "job {{materialized_view_id}}", + ) ], ), panels.timeseries_percentage( "Tokio: Actor Poll Avg Rate Per Poll", "", [ + # panels.target( + # f"rate({metric('stream_actor_poll_duration')}[$__rate_interval]) / (rate({metric('stream_actor_poll_cnt')}[$__rate_interval]) > 0) / 1000000000", + # "fragment {{fragment_id}}", + # ), panels.target( - f"rate({metric('stream_actor_poll_duration')}[$__rate_interval]) / (rate({metric('stream_actor_poll_cnt')}[$__rate_interval]) > 0) / 1000000000", - "fragment {{fragment_id}}", - ), + f"{_sum_fragment_metric_by_mv()}" + ) + ], ), panels.timeseries_percentage( From 3d481f1dac1bf7fc1a4e3d83033f91bf50c0c286 Mon Sep 17 00:00:00 2001 From: YingZhu Date: Sat, 13 Dec 2025 21:45:07 +0800 Subject: [PATCH 2/3] WIP: add job-level cpu profilling Signed-off-by: YingZhu --- .../dashboard/dev/streaming_actors_tokio.py | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/grafana/dashboard/dev/streaming_actors_tokio.py b/grafana/dashboard/dev/streaming_actors_tokio.py index 3f24c070589c0..c106e68c78a1c 100644 --- a/grafana/dashboard/dev/streaming_actors_tokio.py +++ b/grafana/dashboard/dev/streaming_actors_tokio.py @@ -1,22 +1,21 @@ from ..common import * from . import section +# def _sum_actors_by_mv() -> str: +# return ( +# f" sum(sum({metric('stream_actor_count')}) by (fragment_id)" +# f" * on(fragment_id) group_left(materialized_view_id) table_info)" +# f" by (materialized_view_id)" +# ) + def _sum_fragment_metric_by_mv(expr: str) -> str: return ( f"sum(({expr})" f" * on(fragment_id) group_left(materialized_view_id)" f" max by (fragment_id, materialized_view_id) ({metric('table_info')}))" f" by (materialized_view_id)" - f" / sum(sum({metric('stream_actor_count')}) by (fragment_id)" - f" * on(fragment_id) group_left(materialized_view_id) table_info)" - f" by (materialized_view_id)" ) -def _sum_actors_by_mv(expr: str) -> str: - return ( - f" sum(sum(stream_actor_count) by (fragment_id)" - f" " - ) @section def _(outer_panels: Panels): panels = outer_panels.sub_panel() @@ -30,6 +29,8 @@ def _(outer_panels: Panels): [ panels.target( f"{_sum_fragment_metric_by_mv(f'sum(rate({metric('stream_actor_poll_duration')}[$__rate_interval])) by (fragment_id)')}" + f" / " + f" {_sum_fragment_metric_by_mv(f'sum({metric('stream_actor_count')}) by (fragment_id)')}" f" / 1000000000", "job {{materialized_view_id}}", ), @@ -49,14 +50,13 @@ def _(outer_panels: Panels): "Tokio: Actor Poll Avg Rate Per Poll", "", [ - # panels.target( - # f"rate({metric('stream_actor_poll_duration')}[$__rate_interval]) / (rate({metric('stream_actor_poll_cnt')}[$__rate_interval]) > 0) / 1000000000", - # "fragment {{fragment_id}}", - # ), panels.target( - f"{_sum_fragment_metric_by_mv()}" + f"{_sum_fragment_metric_by_mv(f'rate({metric('stream_actor_poll_duration')}[$__rate_interval])')}" + f" / " + f"{_sum_fragment_metric_by_mv(f'(rate({metric('stream_actor_poll_cnt')}[$__rate_interval]) > 0')}" + f" / 1000000000", + "job {{materialized_view_id}}", ) - ], ), panels.timeseries_percentage( From 6144e9de529b144b9f93767d655b343e6d980c07 Mon Sep 17 00:00:00 2001 From: YingZhu Date: Mon, 15 Dec 2025 09:59:53 +0800 Subject: [PATCH 3/3] WIP: add modifications Signed-off-by: YingZhu --- grafana/dashboard/dev/streaming_actors_tokio.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/grafana/dashboard/dev/streaming_actors_tokio.py b/grafana/dashboard/dev/streaming_actors_tokio.py index c106e68c78a1c..0c2b9cf5d95d9 100644 --- a/grafana/dashboard/dev/streaming_actors_tokio.py +++ b/grafana/dashboard/dev/streaming_actors_tokio.py @@ -63,12 +63,19 @@ def _(outer_panels: Panels): "Tokio: Actor Idle Rate Per Actor", "Idle time could be due to no data to process, or waiting for async operations like IO", [ + # panels.target( + # f"sum(rate({metric('stream_actor_idle_duration')}[$__rate_interval])) by (fragment_id)" + # f"/ on(fragment_id) sum({metric('stream_actor_count')}) by (fragment_id)" + # f" / 1000000000", + # "fragment {{fragment_id}}", + # ), panels.target( - f"sum(rate({metric('stream_actor_idle_duration')}[$__rate_interval])) by (fragment_id)" - f"/ on(fragment_id) sum({metric('stream_actor_count')}) by (fragment_id)" + f"{_sum_fragment_metric_by_mv(f'sum(rate({metric('stream_actor_idle_duration')}[$__rate_interval])) by (fragment_id)')}" + f" / " + f"{_sum_fragment_metric_by_mv(f'sum({metric('stream_actor_count')}) by (fragment_id)')}" f" / 1000000000", - "fragment {{fragment_id}}", - ), + "job {{materialized_view_id}}", + ) ], ), panels.timeseries_actor_ops_small(