diff --git a/src/promptflow-core/tests/core/e2etests/test_eager_flow.py b/src/promptflow-core/tests/core/e2etests/test_eager_flow.py index ca1a83e7a1b..2f7d8fcc464 100644 --- a/src/promptflow-core/tests/core/e2etests/test_eager_flow.py +++ b/src/promptflow-core/tests/core/e2etests/test_eager_flow.py @@ -1,5 +1,6 @@ import asyncio from dataclasses import is_dataclass +from unittest.mock import patch import pytest @@ -133,6 +134,7 @@ def test_flow_run(self, flow_folder, inputs, ensure_output, init_kwargs): def test_flow_run_with_openai_chat(self): flow_file = get_yaml_file("callable_class_with_openai", root=EAGER_FLOW_ROOT, file_name="flow.flex.yaml") + # Case 1: Normal case executor = ScriptExecutor(flow_file=flow_file, init_kwargs={"connection": "azure_open_ai_connection"}) line_result = executor.exec_line(inputs={"question": "Hello", "stream": False}, index=0) assert line_result.run_info.status == Status.Completed, line_result.run_info.error @@ -141,6 +143,17 @@ def test_flow_run_with_openai_chat(self): assert token_name in line_result.run_info.api_calls[0]["children"][0]["system_metrics"] assert line_result.run_info.api_calls[0]["children"][0]["system_metrics"][token_name] > 0 + # Case 2: OpenAi metrics calculation failure will not raise error + with patch( + "promptflow.tracing._openai_utils.OpenAIMetricsCalculator._try_get_model", return_value="invalid_model" + ): + executor = ScriptExecutor(flow_file=flow_file, init_kwargs={"connection": "azure_open_ai_connection"}) + line_result = executor.exec_line(inputs={"question": "Hello", "stream": True}, index=0) + assert line_result.run_info.status == Status.Completed, line_result.run_info.error + token_names = ["prompt_tokens", "completion_tokens", "total_tokens"] + for token_name in token_names: + assert token_name not in line_result.run_info.api_calls[0]["children"][0]["system_metrics"] + def test_flow_run_with_connection(self, dev_connections): flow_file = get_yaml_file( "dummy_callable_class_with_connection", root=EAGER_FLOW_ROOT, file_name="flow.flex.yaml" diff --git a/src/promptflow-devkit/promptflow/batch/_result.py b/src/promptflow-devkit/promptflow/batch/_result.py index 0394b6a5807..1c1e9da3832 100644 --- a/src/promptflow-devkit/promptflow/batch/_result.py +++ b/src/promptflow-devkit/promptflow/batch/_result.py @@ -113,10 +113,12 @@ def create( @staticmethod def _get_openai_metrics(line_results: List[LineResult], aggr_results: AggregationResult): - node_run_infos = _get_node_run_infos(line_results, aggr_results) + # Get openai metrics from the flow run info in line results, since the flex flow do not have node run infos. + flow_run_infos = (line_result.run_info for line_result in line_results) + aggr_node_run_infos = (node_run_info for node_run_info in aggr_results.node_run_infos.values()) total_metrics = {} calculator = OpenAIMetricsCalculator() - for run_info in node_run_infos: + for run_info in chain(flow_run_infos, aggr_node_run_infos): metrics = SystemMetrics._try_get_openai_metrics(run_info) if metrics: calculator.merge_metrics_dict(total_metrics, metrics) @@ -131,9 +133,8 @@ def _try_get_openai_metrics(run_info: RunInfo): openai_metrics = {} if run_info.system_metrics: for metric in TokenKeys.get_all_values(): - if metric not in run_info.system_metrics: - return False - openai_metrics[metric] = run_info.system_metrics[metric] + if metric in run_info.system_metrics: + openai_metrics[metric] = run_info.system_metrics[metric] return openai_metrics def to_dict(self): diff --git a/src/promptflow-recording/recordings/local/node_cache.shelve.bak b/src/promptflow-recording/recordings/local/node_cache.shelve.bak index c2c79b489d2..d3d030a73cf 100644 --- a/src/promptflow-recording/recordings/local/node_cache.shelve.bak +++ b/src/promptflow-recording/recordings/local/node_cache.shelve.bak @@ -99,3 +99,4 @@ '57a991472dd300efc84b638768fe2f87e7acb04c', (436736, 6942) 'ab3563e99d1ee052e067952ab332536a6bf5c025', (443904, 1308) 'c462635b056fec500e1f50e9cfa42f69239f0933', (445440, 2642) +'67b119c190b574a9a275e8bdd93bcd06487d5318', (448512, 4994) diff --git a/src/promptflow-recording/recordings/local/node_cache.shelve.dat b/src/promptflow-recording/recordings/local/node_cache.shelve.dat index c78a9d08713..f517eee7b71 100644 Binary files a/src/promptflow-recording/recordings/local/node_cache.shelve.dat and b/src/promptflow-recording/recordings/local/node_cache.shelve.dat differ diff --git a/src/promptflow-recording/recordings/local/node_cache.shelve.dir b/src/promptflow-recording/recordings/local/node_cache.shelve.dir index c2c79b489d2..d3d030a73cf 100644 --- a/src/promptflow-recording/recordings/local/node_cache.shelve.dir +++ b/src/promptflow-recording/recordings/local/node_cache.shelve.dir @@ -99,3 +99,4 @@ '57a991472dd300efc84b638768fe2f87e7acb04c', (436736, 6942) 'ab3563e99d1ee052e067952ab332536a6bf5c025', (443904, 1308) 'c462635b056fec500e1f50e9cfa42f69239f0933', (445440, 2642) +'67b119c190b574a9a275e8bdd93bcd06487d5318', (448512, 4994) diff --git a/src/promptflow-tracing/promptflow/tracing/_openai_utils.py b/src/promptflow-tracing/promptflow/tracing/_openai_utils.py index c0ea789b229..a62d6812a33 100644 --- a/src/promptflow-tracing/promptflow/tracing/_openai_utils.py +++ b/src/promptflow-tracing/promptflow/tracing/_openai_utils.py @@ -1,7 +1,8 @@ -import tiktoken from abc import ABC, abstractmethod from importlib.metadata import version +import tiktoken + IS_LEGACY_OPENAI = version("openai").startswith("0.") @@ -45,7 +46,7 @@ def _get_openai_metrics_for_signal_api(self, api_call: dict): if isinstance(usage, dict): return usage self._log_warning( - "Cannot find openai metrics in output, " "will calculate metrics from response data directly." + "Cannot find openai metrics in output, will calculate metrics from response data directly." ) name = api_call.get("name") @@ -56,24 +57,18 @@ def _get_openai_metrics_for_signal_api(self, api_call: dict): # OpenAI v1 api: # https://github.com/openai/openai-python/blob/main/src/openai/resources/chat/completions.py # https://github.com/openai/openai-python/blob/main/src/openai/resources/completions.py - if ( - name == "openai_chat_legacy" - or name == "openai_chat" # openai v1 - ): + if name == "openai_chat_legacy" or name == "openai_chat": # openai v1 return self.get_openai_metrics_for_chat_api(inputs, output) - elif ( - name == "openai_completion_legacy" - or name == "openai_completion" # openai v1 - ): + elif name == "openai_completion_legacy" or name == "openai_completion": # openai v1 return self.get_openai_metrics_for_completion_api(inputs, output) else: - self._log_warning(f"Calculating metrics for api {name} is not supported.") + raise Exception(f"Calculating metrics for api {name} is not supported.") def _try_get_model(self, inputs, output): if IS_LEGACY_OPENAI: api_type = inputs.get("api_type") if not api_type: - self._log_warning("Cannot calculate metrics for none or empty api_type.") + raise Exception("Cannot calculate metrics for none or empty api_type.") if api_type == "azure": model = inputs.get("engine") else: @@ -90,28 +85,33 @@ def _try_get_model(self, inputs, output): if not model: model = inputs.get("model") if not model: - raise self._log_warning( - "Cannot get a valid model to calculate metrics. " + raise Exception( + "Cannot get a valid model to calculate metrics." "Please specify a engine for AzureOpenAI API or a model for OpenAI API." ) return model def get_openai_metrics_for_chat_api(self, inputs, output): metrics = {} - enc, tokens_per_message, tokens_per_name = self._get_encoding_for_chat_api(self._try_get_model(inputs, output)) - metrics["prompt_tokens"] = self._get_prompt_tokens_from_messages( - inputs["messages"], enc, tokens_per_message, tokens_per_name - ) - if isinstance(output, list): - if IS_LEGACY_OPENAI: - metrics["completion_tokens"] = len(output) + try: + enc, tokens_per_message, tokens_per_name = self._get_encoding_for_chat_api( + self._try_get_model(inputs, output) + ) + metrics["prompt_tokens"] = self._get_prompt_tokens_from_messages( + inputs["messages"], enc, tokens_per_message, tokens_per_name + ) + if isinstance(output, list): + if IS_LEGACY_OPENAI: + metrics["completion_tokens"] = len(output) + else: + metrics["completion_tokens"] = len( + [chunk for chunk in output if chunk.choices and chunk.choices[0].delta.content] + ) else: - metrics["completion_tokens"] = len( - [chunk for chunk in output if chunk.choices and chunk.choices[0].delta.content] - ) - else: - metrics["completion_tokens"] = self._get_completion_tokens_for_chat_api(output, enc) - metrics["total_tokens"] = metrics["prompt_tokens"] + metrics["completion_tokens"] + metrics["completion_tokens"] = self._get_completion_tokens_for_chat_api(output, enc) + metrics["total_tokens"] = metrics["prompt_tokens"] + metrics["completion_tokens"] + except Exception as ex: + self._log_warning(f"Failed to calculate metrics due to exception: {ex}.") return metrics def _get_encoding_for_chat_api(self, model): @@ -126,7 +126,7 @@ def _get_encoding_for_chat_api(self, model): tokens_per_message = 3 tokens_per_name = 1 else: - self._log_warning(f"Calculating metrics for model {model} is not supported.") + raise Exception(f"Calculating metrics for model {model} is not supported.") return enc, tokens_per_message, tokens_per_name def _get_prompt_tokens_from_messages(self, messages, enc, tokens_per_message, tokens_per_name): @@ -155,24 +155,27 @@ def _get_completion_tokens_for_chat_api(self, output, enc): def get_openai_metrics_for_completion_api(self, inputs, output): metrics = {} - enc = self._get_encoding_for_completion_api(self._try_get_model(inputs, output)) - metrics["prompt_tokens"] = 0 - prompt = inputs.get("prompt") - if isinstance(prompt, str): - metrics["prompt_tokens"] = len(enc.encode(prompt)) - elif isinstance(prompt, list): - for pro in prompt: - metrics["prompt_tokens"] += len(enc.encode(pro)) - if isinstance(output, list): - if IS_LEGACY_OPENAI: - metrics["completion_tokens"] = len(output) + try: + enc = self._get_encoding_for_completion_api(self._try_get_model(inputs, output)) + metrics["prompt_tokens"] = 0 + prompt = inputs.get("prompt") + if isinstance(prompt, str): + metrics["prompt_tokens"] = len(enc.encode(prompt)) + elif isinstance(prompt, list): + for pro in prompt: + metrics["prompt_tokens"] += len(enc.encode(pro)) + if isinstance(output, list): + if IS_LEGACY_OPENAI: + metrics["completion_tokens"] = len(output) + else: + metrics["completion_tokens"] = len( + [chunk for chunk in output if chunk.choices and chunk.choices[0].text] + ) else: - metrics["completion_tokens"] = len( - [chunk for chunk in output if chunk.choices and chunk.choices[0].text] - ) - else: - metrics["completion_tokens"] = self._get_completion_tokens_for_completion_api(output, enc) - metrics["total_tokens"] = metrics["prompt_tokens"] + metrics["completion_tokens"] + metrics["completion_tokens"] = self._get_completion_tokens_for_completion_api(output, enc) + metrics["total_tokens"] = metrics["prompt_tokens"] + metrics["completion_tokens"] + except Exception as ex: + self._log_warning(f"Failed to calculate metrics due to exception: {ex}.") return metrics def _get_encoding_for_completion_api(self, model): diff --git a/src/promptflow/tests/conftest.py b/src/promptflow/tests/conftest.py index 2eadd0c9e23..4ae92a2a170 100644 --- a/src/promptflow/tests/conftest.py +++ b/src/promptflow/tests/conftest.py @@ -13,11 +13,11 @@ from filelock import FileLock from pytest_mock import MockerFixture -from promptflow._cli._utils import AzureMLWorkspaceTriad from promptflow._constants import PROMPTFLOW_CONNECTIONS from promptflow._core.connection_manager import ConnectionManager -from promptflow._utils.context_utils import _change_working_dir +from promptflow._sdk._constants import AzureMLWorkspaceTriad from promptflow._sdk.entities._connection import AzureOpenAIConnection +from promptflow._utils.context_utils import _change_working_dir load_dotenv() diff --git a/src/promptflow/tests/executor/e2etests/test_eager_flow.py b/src/promptflow/tests/executor/e2etests/test_eager_flow.py index 7955319682a..da5445586cc 100644 --- a/src/promptflow/tests/executor/e2etests/test_eager_flow.py +++ b/src/promptflow/tests/executor/e2etests/test_eager_flow.py @@ -81,6 +81,22 @@ def test_batch_run(self, flow_folder, inputs_mapping, ensure_output, init_kwargs batch_result = batch_engine.run(input_dirs, inputs_mapping, output_dir) validate_batch_result(batch_result, flow_folder, output_dir, ensure_output) + def test_batch_run_with_openai(self, dev_connections): + flow_folder = "callable_class_with_openai" + inputs_mapping = {"question": "${data.question}", "stream": "${data.stream}"} + batch_engine = BatchEngine( + get_yaml_file(flow_folder, root=EAGER_FLOW_ROOT), + get_flow_folder(flow_folder, root=EAGER_FLOW_ROOT), + init_kwargs={"connection": "azure_open_ai_connection"}, + connections=dev_connections, + ) + input_dirs = {"data": get_flow_inputs_file(flow_folder, root=EAGER_FLOW_ROOT)} + output_dir = Path(mkdtemp()) + batch_result = batch_engine.run(input_dirs, inputs_mapping, output_dir) + for token_name in ["prompt_tokens", "completion_tokens", "total_tokens"]: + assert getattr(batch_result.system_metrics, token_name, 0) > 0 + validate_batch_result(batch_result, flow_folder, output_dir, lambda x: isinstance(x, dict)) + def test_batch_run_with_invalid_case(self): flow_folder = "dummy_flow_with_exception" batch_engine = BatchEngine( diff --git a/src/promptflow/tests/executor/unittests/batch/test_result.py b/src/promptflow/tests/executor/unittests/batch/test_result.py index 98637ef79d9..2169afd528c 100644 --- a/src/promptflow/tests/executor/unittests/batch/test_result.py +++ b/src/promptflow/tests/executor/unittests/batch/test_result.py @@ -31,9 +31,18 @@ def get_node_run_infos(node_dict: dict, index=None, api_calls=None, system_metri } -def get_flow_run_info(status_dict: dict, index: int): +def get_flow_run_info(status_dict: dict, index: int, api_calls=None, system_metrics=None): status = Status.Failed if any(status == Status.Failed for status in status_dict.values()) else Status.Completed error = {"code": "UserError", "message": "test message"} if status == Status.Failed else None + children = [] + aggregated_tokens = {"total_tokens": 0, "prompt_tokens": 0, "completion_tokens": 0} + for i in range(len(status_dict)): + if api_calls is not None: + children.extend(api_calls) + if system_metrics is not None: + for k, _ in aggregated_tokens.items(): + if k in system_metrics: + aggregated_tokens[k] += system_metrics[k] return FlowRunInfo( run_id=f"{index}_run_id", status=status, @@ -49,6 +58,8 @@ def get_flow_run_info(status_dict: dict, index: int): start_time=datetime.utcnow(), end_time=datetime.utcnow(), index=index, + api_calls=[get_api_call("Flow", "Flow", children=children)] if api_calls else None, + system_metrics=aggregated_tokens if system_metrics else None, ) @@ -57,7 +68,7 @@ def get_line_results(line_dict: dict, api_calls=None, system_metrics=None): LineResult( output={}, aggregation_inputs={}, - run_info=get_flow_run_info(status_dict=v, index=k), + run_info=get_flow_run_info(status_dict=v, index=k, api_calls=api_calls, system_metrics=system_metrics), node_run_infos=get_node_run_infos(node_dict=v, index=k, api_calls=api_calls, system_metrics=system_metrics), ) for k, v in line_dict.items() @@ -259,12 +270,12 @@ def test_create(slef): } aggr_result = get_aggregation_result(aggr_dict, system_metrics=aggr_system_metrics) system_metrics = SystemMetrics.create(datetime.utcnow(), datetime.utcnow(), line_results, aggr_result) - assert system_metrics.total_tokens == 20 - assert system_metrics.prompt_tokens == 12 + assert system_metrics.total_tokens == 30 + assert system_metrics.prompt_tokens == 18 assert system_metrics.completion_tokens == 8 system_metrics_dict = { - "total_tokens": 20, - "prompt_tokens": 12, + "total_tokens": 30, + "prompt_tokens": 18, "completion_tokens": 8, } assert system_metrics_dict.items() <= system_metrics.to_dict().items() diff --git a/src/promptflow/tests/test_configs/eager_flows/callable_class_with_openai/openai_chat.py b/src/promptflow/tests/test_configs/eager_flows/callable_class_with_openai/openai_chat.py index 348a764f5eb..6699c503543 100644 --- a/src/promptflow/tests/test_configs/eager_flows/callable_class_with_openai/openai_chat.py +++ b/src/promptflow/tests/test_configs/eager_flows/callable_class_with_openai/openai_chat.py @@ -1,7 +1,6 @@ from openai import AzureOpenAI from promptflow.connections import AzureOpenAIConnection -from promptflow.tracing import trace class MyClass: @@ -36,8 +35,7 @@ def generator(): yield chunk.choices[0].delta.content or "" # We must return the generator object, not using yield directly here. # Otherwise, the function itself will become a generator, despite whether stream is True or False. - # return generator() - return "".join(generator()) + return generator() else: # chat api may return message with no content. return completion.choices[0].message.content or ""