Skip to content

Commit

Permalink
Local to remote tracking for evaluation (#3104)
Browse files Browse the repository at this point in the history
# Description

Please add an informative description that covers that changes made by
the pull request and link all relevant issues.

# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.

---------

Co-authored-by: Billy Hu <[email protected]>
  • Loading branch information
singankit and ninghu authored May 3, 2024
1 parent 43ef929 commit 12a0f98
Show file tree
Hide file tree
Showing 9 changed files with 326 additions and 9 deletions.
1 change: 1 addition & 0 deletions .github/workflows/promptflow-evals-e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ jobs:
poetry run pip install -e ../promptflow-devkit
poetry run pip install -e ../promptflow-tracing
poetry run pip install -e ../promptflow-tools
poetry run pip install -e ../promptflow-azure
working-directory: ${{ env.WORKING_DIRECTORY }}
- name: install promptflow-evals from wheel
# wildcard expansion (*) does not work in Windows, so leverage python to find and install
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/promptflow-evals-unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ jobs:
poetry run pip install -e ../promptflow-devkit
poetry run pip install -e ../promptflow-tracing
poetry run pip install -e ../promptflow-tools
poetry run pip install -e ../promptflow-azure
working-directory: ${{ env.WORKING_DIRECTORY }}
- name: install promptflow-evals from wheel
# wildcard expansion (*) does not work in Windows, so leverage python to find and install
Expand Down
6 changes: 6 additions & 0 deletions src/promptflow-evals/promptflow/evals/_user_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
from promptflow.evals._version import VERSION

USER_AGENT = "{}/{}".format("promptflow-evals", VERSION)
37 changes: 29 additions & 8 deletions src/promptflow-evals/promptflow/evals/evaluate/_evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import os
import re
import tempfile
import uuid
from typing import Any, Callable, Dict, Optional, Set, Tuple

import pandas as pd

from promptflow._sdk._constants import LINE_NUMBER
from promptflow.client import PFClient
from ._utils import _log_metrics_and_instance_results
from .._user_agent import USER_AGENT


def _calculate_mean(df) -> Dict[str, float]:
Expand Down Expand Up @@ -104,7 +105,8 @@ def _validate_columns(


def _apply_target_to_data(
target: Callable, data: str, pf_client: PFClient, initial_data: pd.DataFrame
target: Callable, data: str, pf_client: PFClient, initial_data: pd.DataFrame,
evaluation_name: Optional[str] = None
) -> Tuple[pd.DataFrame, Set[str]]:
"""
Apply the target function to the data set and return updated data and generated columns.
Expand All @@ -123,11 +125,17 @@ def _apply_target_to_data(
# We are manually creating the temporary directory for the flow
# because the way tempdir remove temporary directories will
# hang the debugger, because promptflow will keep flow directory.
run = pf_client.run(flow=target, data=data, name=f"preprocess_{uuid.uuid1()}", stream=True)
run = pf_client.run(
flow=target,
display_name=evaluation_name,
data=data,
properties={"runType": "eval_run"},
stream=True
)
target_output = pf_client.runs.get_details(run, all_results=True)
# Remove input and output prefix
prefix = "outputs."
rename_dict = {col: col[len(prefix) :] for col in target_output.columns if col.startswith(prefix)}
rename_dict = {col: col[len(prefix):] for col in target_output.columns if col.startswith(prefix)}
# Sort output by line numbers
target_output.set_index(f"inputs.{LINE_NUMBER}", inplace=True)
target_output.sort_index(inplace=True)
Expand All @@ -140,7 +148,7 @@ def _apply_target_to_data(
target_output.rename(columns=rename_dict, inplace=True)
# Concatenate output to input
target_output = pd.concat([target_output, initial_data], axis=1)
return target_output, set(rename_dict.values())
return target_output, set(rename_dict.values()), run


def _apply_column_mapping(source_df: pd.DataFrame, mapping_config: dict, inplace: bool = False):
Expand Down Expand Up @@ -230,11 +238,19 @@ def evaluate(
evaluator_config = _process_evaluator_config(evaluator_config)
_validate_columns(input_data_df, evaluators, target, evaluator_config)

pf_client = PFClient()
pf_client = PFClient(
config={
"trace.destination": tracking_uri
} if tracking_uri else None,
user_agent=USER_AGENT,

)
target_run = None

target_generated_columns = set()
if data is not None and target is not None:
input_data_df, target_generated_columns = _apply_target_to_data(target, data, pf_client, input_data_df)
input_data_df, target_generated_columns, target_run = _apply_target_to_data(target, data, pf_client,
input_data_df, evaluation_name)
# After we have generated all columns we can check if we have
# everything we need for evaluators.
_validate_columns(input_data_df, evaluators, target=None, evaluator_config=evaluator_config)
Expand All @@ -251,6 +267,7 @@ def evaluate(
evaluator_info[evaluator_name] = {}
evaluator_info[evaluator_name]["run"] = pf_client.run(
flow=evaluator,
run=target_run,
column_mapping=evaluator_config.get(evaluator_name, evaluator_config.get("default", None)),
data=data_file,
stream=True,
Expand Down Expand Up @@ -290,5 +307,9 @@ def evaluate(
)

result_df = pd.concat([input_data_df, evaluators_result_df], axis=1, verify_integrity=True)
metrics = _calculate_mean(evaluators_result_df)

studio_url = _log_metrics_and_instance_results(
metrics, result_df, tracking_uri, target_run, pf_client, data, evaluation_name)

return {"rows": result_df.to_dict("records"), "metrics": _calculate_mean(evaluators_result_df), "traces": {}}
return {"rows": result_df.to_dict("records"), "metrics": metrics, "studio_url": studio_url}
142 changes: 142 additions & 0 deletions src/promptflow-evals/promptflow/evals/evaluate/_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,151 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import logging
import json
import os
import tempfile
from pathlib import Path

import mlflow

from promptflow._sdk._constants import Local2Cloud
from promptflow._sdk._utilities.general_utils import extract_workspace_triad_from_trace_provider
from promptflow._utils.async_utils import async_run_allowing_running_loop
from promptflow.azure.operations._async_run_uploader import AsyncRunUploader

LOGGER = logging.getLogger(__name__)


def load_jsonl(path):
with open(path, "r", encoding="utf-8") as f:
return [json.loads(line) for line in f.readlines()]


def _write_properties_to_run_history(properties: dict) -> None:
from mlflow.tracking import MlflowClient
from mlflow.utils.rest_utils import http_request

# get mlflow run
run = mlflow.active_run()
if run is None:
run = mlflow.start_run()
# get auth from client
client = MlflowClient()
try:
cred = client._tracking_client.store.get_host_creds() # pylint: disable=protected-access
# update host to run history and request PATCH API
cred.host = cred.host.replace("mlflow/v2.0", "mlflow/v1.0").replace("mlflow/v1.0", "history/v1.0")
response = http_request(
host_creds=cred,
endpoint=f"/experimentids/{run.info.experiment_id}/runs/{run.info.run_id}",
method="PATCH",
json={"runId": run.info.run_id, "properties": properties},
)
if response.status_code != 200:
LOGGER.error("Fail writing properties '%s' to run history: %s", properties, response.text)
response.raise_for_status()
except AttributeError as e:
LOGGER.error("Fail writing properties '%s' to run history: %s", properties, e)


def _azure_pf_client(trace_destination):
from promptflow._sdk._utilities.general_utils import extract_workspace_triad_from_trace_provider
from promptflow.azure._cli._utils import _get_azure_pf_client

ws_triad = extract_workspace_triad_from_trace_provider(trace_destination)
azure_pf_client = _get_azure_pf_client(
subscription_id=ws_triad.subscription_id,
resource_group=ws_triad.resource_group_name,
workspace_name=ws_triad.workspace_name,
)

return azure_pf_client


def _get_mlflow_tracking_uri(trace_destination):
from promptflow._sdk._utilities.general_utils import extract_workspace_triad_from_trace_provider

azure_pf_client = _azure_pf_client(trace_destination)
ws_triad = extract_workspace_triad_from_trace_provider(trace_destination)

ws = azure_pf_client.ml_client.workspaces.get(ws_triad.workspace_name)
return ws.mlflow_tracking_uri


def _get_trace_destination_config(tracking_uri):
from promptflow._sdk._configuration import Configuration
pf_config = Configuration(overrides={
"trace.destination": tracking_uri
} if tracking_uri is not None else {}
)

trace_destination = pf_config.get_trace_destination()

return trace_destination


def _log_metrics_and_instance_results(metrics, instance_results, tracking_uri, run, pf_client, data,
evaluation_name=None) -> str:
run_id = None
trace_destination = _get_trace_destination_config(tracking_uri=tracking_uri)

if trace_destination is None:
return None

tracking_uri = _get_mlflow_tracking_uri(trace_destination=trace_destination)

# Adding line_number as index column this is needed by UI to form link to individual instance run
instance_results["line_number"] = instance_results.index

if run is None:
mlflow.set_tracking_uri(tracking_uri)

with tempfile.TemporaryDirectory() as tmpdir:
with mlflow.start_run(run_name=evaluation_name) as run:
tmp_path = os.path.join(tmpdir, "eval_results.jsonl")

with open(tmp_path, "w", encoding="utf-8") as f:
f.write(instance_results.to_json(orient="records", lines=True))

mlflow.log_artifact(tmp_path)

# Using mlflow to create a dummy run since once created via PF show traces of dummy run in UI.
# Those traces can be confusing.
# adding these properties to avoid showing traces if a dummy run is created
_write_properties_to_run_history(
properties={
"_azureml.evaluation_run": "azure-ai-generative-parent",
"_azureml.evaluate_artifacts": json.dumps([{"path": "eval_results.jsonl", "type": "table"}])
})
run_id = run.info.run_id
else:
azure_pf_client = _azure_pf_client(trace_destination=trace_destination)
with tempfile.TemporaryDirectory() as temp_dir:
file_name = Local2Cloud.FLOW_INSTANCE_RESULTS_FILE_NAME
local_file = Path(temp_dir) / file_name
instance_results.to_json(local_file, orient="records", lines=True)

# overriding instance_results.jsonl file
async_uploader = AsyncRunUploader._from_run_operations(run, azure_pf_client.runs)
remote_file = (f"{Local2Cloud.BLOB_ROOT_PROMPTFLOW}"
f"/{Local2Cloud.BLOB_ARTIFACTS}/{run.name}/{Local2Cloud.FLOW_INSTANCE_RESULTS_FILE_NAME}")
async_run_allowing_running_loop(async_uploader._upload_local_file_to_blob, local_file, remote_file)
run_id = run.name

client = mlflow.tracking.MlflowClient(tracking_uri=tracking_uri)
for metric_name, metric_value in metrics.items():
client.log_metric(run_id, metric_name, metric_value)

return _get_ai_studio_url(trace_destination=trace_destination, evaluation_id=run_id)


def _get_ai_studio_url(trace_destination: str, evaluation_id: str) -> str:
ws_triad = extract_workspace_triad_from_trace_provider(trace_destination)
studio_base_url = os.getenv("AI_STUDIO_BASE_URL", "https://ai.azure.com")

studio_url = f"{studio_base_url}/build/evaluation/{evaluation_id}?wsid=/subscriptions/{ws_triad.subscription_id}" \
f"/resourceGroups/{ws_triad.resource_group_name}/providers/Microsoft.MachineLearningServices/" \
f"workspaces/{ws_triad.workspace_name}"

return studio_url
3 changes: 3 additions & 0 deletions src/promptflow-evals/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ azure-ai-ml = ">=1.14.0"
promptflow-devkit = "<2.0.0"
promptflow-core = "<2.0.0"
promptflow-tools = "<2.0.0"
promptflow-azure = "<2.0.0" # Needed for remote tracking
mlflow = "<3.0.0" # Needed for remote tracking to log metrics
azureml-mlflow = "<2.0.0" # Needed for remote tracking to log metrics

[tool.poetry.group.dev.dependencies]
pre-commit = "*"
Expand Down
36 changes: 36 additions & 0 deletions src/promptflow-evals/tests/evals/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
import multiprocessing
import os
import subprocess
from pathlib import Path
from typing import Dict
from unittest.mock import patch

import pytest
Expand All @@ -12,6 +14,8 @@
from promptflow.executor._line_execution_process_pool import _process_wrapper
from promptflow.executor._process_manager import create_spawned_fork_process_manager
from promptflow.tracing._integrations._openai_injector import inject_openai_api
from promptflow.azure import PFClient as AzurePFClient
from azure.identity import DefaultAzureCredential

try:
from promptflow.recording.local import recording_array_reset
Expand Down Expand Up @@ -54,6 +58,11 @@ def configure_default_azure_credential():
creds = dev_connections["pf-evals-sp"]["value"]
for key, value in creds.items():
os.environ[key] = value
login_output = subprocess.check_output(
["az", "login", "--service-principal", "-u", creds["AZURE_CLIENT_ID"],
"-p", creds["AZURE_CLIENT_SECRET"], "--tenant", creds["AZURE_TENANT_ID"]], shell=True)
print("loging_output")
print(login_output)


def pytest_configure():
Expand Down Expand Up @@ -115,6 +124,33 @@ def project_scope() -> dict:
return dev_connections[conn_name]["value"]


@pytest.fixture
def mock_trace_destination_to_cloud(project_scope: dict):
"""Mock trace destination to cloud."""

subscription_id = project_scope["subscription_id"]
resource_group_name = project_scope["resource_group_name"]
workspace_name = project_scope["project_name"]

trace_destination = (
f"azureml://subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/"
f"providers/Microsoft.MachineLearningServices/workspaces/{workspace_name}"
)
with patch("promptflow._sdk._configuration.Configuration.get_trace_destination", return_value=trace_destination):
yield


@pytest.fixture
def azure_pf_client(project_scope: Dict):
"""The fixture, returning AzurePFClient"""
return AzurePFClient(
subscription_id=project_scope["subscription_id"],
resource_group_name=project_scope["resource_group_name"],
workspace_name=project_scope["project_name"],
credential=DefaultAzureCredential()
)


@pytest.fixture
def pf_client() -> PFClient:
"""The fixture, returning PRClient"""
Expand Down
Loading

0 comments on commit 12a0f98

Please sign in to comment.