Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci-pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:

steps:
- name: Run nox
uses: frequenz-floss/[email protected].0
uses: frequenz-floss/[email protected].1
with:
python-version: "3.11"
nox-session: ci_checks_max
Expand All @@ -35,7 +35,7 @@ jobs:
submodules: true

- name: Setup Python
uses: frequenz-floss/[email protected].0
uses: frequenz-floss/[email protected].1
with:
python-version: ${{ env.DEFAULT_PYTHON_VERSION }}
dependencies: .[dev-mkdocs]
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
# only use hashes to pick the action to execute (instead of tags or branches).
# For more details read:
# https://securitylab.github.com/research/github-actions-preventing-pwn-requests/
uses: actions/labeler@8558fd74291d67161a8a78ce36a881fa63b766a9 # 5.0.0
uses: actions/labeler@634933edcd8ababfe52f92936142cc22ac488b1b # 6.0.1
with:
repo-token: "${{ secrets.GITHUB_TOKEN }}"
dot: true
2 changes: 2 additions & 0 deletions src/frequenz/client/assets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from ._client import AssetsApiClient
from ._delivery_area import DeliveryArea, EnergyMarketCodeType
from ._lifetime import Lifetime
from ._location import Location
from ._microgrid import Microgrid, MicrogridStatus

Expand All @@ -15,4 +16,5 @@
"Microgrid",
"MicrogridStatus",
"Location",
"Lifetime",
]
51 changes: 47 additions & 4 deletions src/frequenz/client/assets/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@

from __future__ import annotations

from collections.abc import Iterable

from frequenz.api.assets.v1 import assets_pb2, assets_pb2_grpc
from frequenz.client.base import channel
from frequenz.client.base.client import BaseApiClient, call_stub_method

from frequenz.client.assets.electrical_component._electrical_component import (
ElectricalComponent,
)

from ._microgrid import Microgrid
from ._microgrid_proto import microgrid_from_proto
from .electrical_component._connection import ComponentConnection
from .electrical_component._connection_proto import component_connection_from_proto
from .electrical_component._electrical_component import ElectricalComponent
from .electrical_component._electrical_component_proto import electrical_component_proto
from .exceptions import ClientNotConnected

Expand Down Expand Up @@ -138,3 +139,45 @@ async def list_microgrid_electrical_components(
return [
electrical_component_proto(component) for component in response.components
]

async def list_microgrid_electrical_component_connections(
self,
microgrid_id: int,
source_component_ids: Iterable[int] = (),
destination_component_ids: Iterable[int] = (),
) -> list[ComponentConnection | None]:
"""
Get the electrical component connections of a microgrid.

Args:
microgrid_id: The ID of the microgrid to get the electrical
component connections of.
source_component_ids: Only return connections that originate from
these component IDs. If None or empty, no filtering is applied.
destination_component_ids: Only return connections that terminate at
these component IDs. If None or empty, no filtering is applied.

Returns:
The electrical component connections of the microgrid.
"""
request = assets_pb2.ListMicrogridElectricalComponentConnectionsRequest(
microgrid_id=microgrid_id,
source_component_ids=source_component_ids,
destination_component_ids=destination_component_ids,
)

response = await call_stub_method(
self,
lambda: self.stub.ListMicrogridElectricalComponentConnections(
request,
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
),
method_name="ListMicrogridElectricalComponentConnections",
)

return list(
map(
component_connection_from_proto,
filter(bool, response.connections),
)
)
4 changes: 2 additions & 2 deletions src/frequenz/client/assets/_lifetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ class Lifetime:
"""An active operational period of a microgrid asset.

Warning:
The [`end`][frequenz.client.microgrid.Lifetime.end] timestamp indicates that the
The [`end`][frequenz.client.assets.Lifetime.end] timestamp indicates that the
asset has been permanently removed from the system.
"""

start: datetime | None = None
"""The moment when the asset became operationally active.

If `None`, the asset is considered to be active in any past moment previous to the
[`end`][frequenz.client.microgrid.Lifetime.end].
[`end`][frequenz.client.assets.Lifetime.end].
"""

end: datetime | None = None
Expand Down
91 changes: 90 additions & 1 deletion src/frequenz/client/assets/cli/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@
from frequenz.client.assets._client import AssetsApiClient
from frequenz.client.assets.exceptions import ApiClientError

from ._utils import print_electrical_components, print_microgrid_details
from ._utils import (
print_component_connections,
print_electrical_components,
print_microgrid_details,
)


@click.group(invoke_without_command=True)
Expand Down Expand Up @@ -201,6 +205,91 @@ async def list_microgrid_electrical_components(
raise click.Abort()


@cli.command("component-connections")
@click.pass_context
@click.argument("microgrid-id", required=True, type=int)
@click.option(
"--source",
"source_component_ids",
help="Filter connections by source component ID(s). Can be specified multiple times.",
type=int,
multiple=True,
required=False,
)
@click.option(
"--destination",
"destination_component_ids",
help="Filter connections by destination component ID(s). Can be specified multiple times.",
type=int,
multiple=True,
required=False,
)
async def list_microgrid_electrical_component_connections(
ctx: click.Context,
microgrid_id: int,
source_component_ids: tuple[int, ...],
destination_component_ids: tuple[int, ...],
) -> None:
"""
Get and display electrical component connections by microgrid ID.

This command fetches detailed information about all electrical component connections
in a specific microgrid from the Assets API and displays it in JSON format.
The output can be piped to other tools for further processing.

Args:
ctx: Click context object containing the initialized API client.
microgrid_id: The unique identifier of the microgrid to retrieve.
source_component_ids: Optional filter for connections from specific
source component IDs.
destination_component_ids: Optional filter for connections to specific
destination component IDs.

Raises:
click.Abort: If there is an error printing the electrical component connections.

Example:
```bash
# Get all connections for microgrid with ID 123
assets-cli component-connections 123

# Filter by source component
assets-cli component-connections 123 --source 5

# Filter by destination component
assets-cli component-connections 123 --destination 10

# Filter by both source and destination
assets-cli component-connections 123 --source 5 --destination 10

# Filter by multiple source components
assets-cli component-connections 123 --source 5 --source 6 --source 7

# Pipe output to jq for filtering
assets-cli component-connections 123 | jq ".[]"
```
"""
try:
client = ctx.obj["client"]
component_connections = (
await client.list_microgrid_electrical_component_connections(
microgrid_id,
source_component_ids=source_component_ids,
destination_component_ids=destination_component_ids,
)
)
print_component_connections(component_connections)
except ApiClientError as e:
error_dict = {
"error_type": type(e).__name__,
"server_url": e.server_url,
"operation": e.operation,
"description": e.description,
}
click.echo(json.dumps(error_dict, indent=2))
raise click.Abort()


def main() -> None:
"""
Initialize and run the CLI application.
Expand Down
18 changes: 18 additions & 0 deletions src/frequenz/client/assets/cli/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

from frequenz.client.assets._microgrid import Microgrid
from frequenz.client.assets._microgrid_json import microgrid_to_json
from frequenz.client.assets.electrical_component._connection import ComponentConnection
from frequenz.client.assets.electrical_component._connection_json import (
component_connections_to_json,
)
from frequenz.client.assets.electrical_component._electrical_component import (
ElectricalComponent,
)
Expand Down Expand Up @@ -42,3 +46,17 @@ def print_electrical_components(
electrical_components: The list of ElectricalComponent instances to print to console.
"""
click.echo(electrical_components_to_json(electrical_components))


def print_component_connections(
component_connections: list[ComponentConnection],
) -> None:
"""
Print electrical component connections to console in JSON format using custom encoder.

This function converts the ComponentConnection instances to JSON using a custom
encoder and outputs it as formatted JSON to the console. The output is
designed to be machine-readable and can be piped to tools like jq for
further processing.
"""
click.echo(component_connections_to_json(component_connections))
2 changes: 2 additions & 0 deletions src/frequenz/client/assets/electrical_component/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ._capacitor_bank import CapacitorBank
from ._category import ElectricalComponentCategory
from ._chp import Chp
from ._connection import ComponentConnection
from ._converter import Converter
from ._crypto_miner import CryptoMiner
from ._electrical_component import ElectricalComponent
Expand Down Expand Up @@ -87,4 +88,5 @@
"StaticTransferSwitch",
"UninterruptiblePowerSupply",
"WindTurbine",
"ComponentConnection",
]
72 changes: 72 additions & 0 deletions src/frequenz/client/assets/electrical_component/_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Component connection."""

import dataclasses
from datetime import datetime, timezone

from frequenz.client.common.microgrid.components import ComponentId

from .._lifetime import Lifetime


@dataclasses.dataclass(frozen=True, kw_only=True)
class ComponentConnection:
"""A single electrical link between two components within a microgrid.

A component connection represents the physical wiring as viewed from the grid
connection point, if one exists, or from the islanding point, in case of an islanded
microgrids.

Note: Physical Representation
This object is not about data flow but rather about the physical
electrical connections between components. Therefore, the IDs for the
source and destination components correspond to the actual setup within
the microgrid.

Note: Direction
The direction of the connection follows the flow of current away from the
grid connection point, or in case of islands, away from the islanding
point. This direction is aligned with positive current according to the
[Passive Sign Convention]
(https://en.wikipedia.org/wiki/Passive_sign_convention).

Note: Historical Data
The timestamps of when a connection was created and terminated allow for
tracking the changes over time to a microgrid, providing insights into
when and how the microgrid infrastructure has been modified.
"""

source: ComponentId
"""The unique identifier of the component where the connection originates.

This is aligned with the direction of current flow away from the grid connection
point, or in case of islands, away from the islanding point.
"""

destination: ComponentId
"""The unique ID of the component where the connection terminates.

This is the component towards which the current flows.
"""

operational_lifetime: Lifetime = dataclasses.field(default_factory=Lifetime)
"""The operational lifetime of the connection."""

def __post_init__(self) -> None:
"""Ensure that the source and destination components are different."""
if self.source == self.destination:
raise ValueError("Source and destination components must be different")

def is_operational_at(self, timestamp: datetime) -> bool:
"""Check whether this connection is operational at a specific timestamp."""
return self.operational_lifetime.is_operational_at(timestamp)

def is_operational_now(self) -> bool:
"""Whether this connection is currently operational."""
return self.is_operational_at(datetime.now(timezone.utc))

def __str__(self) -> str:
"""Return a human-readable string representation of this instance."""
return f"{self.source}->{self.destination}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""JSON encoder for ComponentConnection objects."""

import json
from dataclasses import asdict

from .._utils import AssetsJSONEncoder
from ._connection import ComponentConnection


def component_connections_to_json(
component_connections: list[ComponentConnection],
) -> str:
"""Convert a list of ElectricalComponent objects to a JSON string."""
return json.dumps(
[asdict(connection) for connection in component_connections],
cls=AssetsJSONEncoder,
indent=2,
)
Loading