Skip to content

Commit

Permalink
Fix bugs about OpenAI tokens (#3067)
Browse files Browse the repository at this point in the history
# Description

Fix some bugs about OpenAI tokens:
- For the batch run of flex flow, there is no tokens in
`BatchResult.system_metrices`, since we get the tokens from node run
info in line result, but the flex flow doesn't have node run info, in
this PR we get the tokens from flow run infos.
- Ensure not raising exceptions from `OpenAIMetricsCalculator`. 

# All Promptflow Contribution checklist:
- [x] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [x] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [x] Title of the pull request is clear and informative.
- [x] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [x] Pull request includes test coverage for the included changes.

---------

Co-authored-by: Lina Tang <[email protected]>
  • Loading branch information
lumoslnt and Lina Tang authored May 10, 2024
1 parent c3b90ab commit 4845e67
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 61 deletions.
13 changes: 13 additions & 0 deletions src/promptflow-core/tests/core/e2etests/test_eager_flow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from dataclasses import is_dataclass
from unittest.mock import patch

import pytest

Expand Down Expand Up @@ -133,6 +134,7 @@ def test_flow_run(self, flow_folder, inputs, ensure_output, init_kwargs):
def test_flow_run_with_openai_chat(self):
flow_file = get_yaml_file("callable_class_with_openai", root=EAGER_FLOW_ROOT, file_name="flow.flex.yaml")

# Case 1: Normal case
executor = ScriptExecutor(flow_file=flow_file, init_kwargs={"connection": "azure_open_ai_connection"})
line_result = executor.exec_line(inputs={"question": "Hello", "stream": False}, index=0)
assert line_result.run_info.status == Status.Completed, line_result.run_info.error
Expand All @@ -141,6 +143,17 @@ def test_flow_run_with_openai_chat(self):
assert token_name in line_result.run_info.api_calls[0]["children"][0]["system_metrics"]
assert line_result.run_info.api_calls[0]["children"][0]["system_metrics"][token_name] > 0

# Case 2: OpenAi metrics calculation failure will not raise error
with patch(
"promptflow.tracing._openai_utils.OpenAIMetricsCalculator._try_get_model", return_value="invalid_model"
):
executor = ScriptExecutor(flow_file=flow_file, init_kwargs={"connection": "azure_open_ai_connection"})
line_result = executor.exec_line(inputs={"question": "Hello", "stream": True}, index=0)
assert line_result.run_info.status == Status.Completed, line_result.run_info.error
token_names = ["prompt_tokens", "completion_tokens", "total_tokens"]
for token_name in token_names:
assert token_name not in line_result.run_info.api_calls[0]["children"][0]["system_metrics"]

def test_flow_run_with_connection(self, dev_connections):
flow_file = get_yaml_file(
"dummy_callable_class_with_connection", root=EAGER_FLOW_ROOT, file_name="flow.flex.yaml"
Expand Down
11 changes: 6 additions & 5 deletions src/promptflow-devkit/promptflow/batch/_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,12 @@ def create(

@staticmethod
def _get_openai_metrics(line_results: List[LineResult], aggr_results: AggregationResult):
node_run_infos = _get_node_run_infos(line_results, aggr_results)
# Get openai metrics from the flow run info in line results, since the flex flow do not have node run infos.
flow_run_infos = (line_result.run_info for line_result in line_results)
aggr_node_run_infos = (node_run_info for node_run_info in aggr_results.node_run_infos.values())
total_metrics = {}
calculator = OpenAIMetricsCalculator()
for run_info in node_run_infos:
for run_info in chain(flow_run_infos, aggr_node_run_infos):
metrics = SystemMetrics._try_get_openai_metrics(run_info)
if metrics:
calculator.merge_metrics_dict(total_metrics, metrics)
Expand All @@ -131,9 +133,8 @@ def _try_get_openai_metrics(run_info: RunInfo):
openai_metrics = {}
if run_info.system_metrics:
for metric in TokenKeys.get_all_values():
if metric not in run_info.system_metrics:
return False
openai_metrics[metric] = run_info.system_metrics[metric]
if metric in run_info.system_metrics:
openai_metrics[metric] = run_info.system_metrics[metric]
return openai_metrics

def to_dict(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,4 @@
'57a991472dd300efc84b638768fe2f87e7acb04c', (436736, 6942)
'ab3563e99d1ee052e067952ab332536a6bf5c025', (443904, 1308)
'c462635b056fec500e1f50e9cfa42f69239f0933', (445440, 2642)
'67b119c190b574a9a275e8bdd93bcd06487d5318', (448512, 4994)
Binary file modified src/promptflow-recording/recordings/local/node_cache.shelve.dat
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,4 @@
'57a991472dd300efc84b638768fe2f87e7acb04c', (436736, 6942)
'ab3563e99d1ee052e067952ab332536a6bf5c025', (443904, 1308)
'c462635b056fec500e1f50e9cfa42f69239f0933', (445440, 2642)
'67b119c190b574a9a275e8bdd93bcd06487d5318', (448512, 4994)
93 changes: 48 additions & 45 deletions src/promptflow-tracing/promptflow/tracing/_openai_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import tiktoken
from abc import ABC, abstractmethod
from importlib.metadata import version

import tiktoken

IS_LEGACY_OPENAI = version("openai").startswith("0.")


Expand Down Expand Up @@ -45,7 +46,7 @@ def _get_openai_metrics_for_signal_api(self, api_call: dict):
if isinstance(usage, dict):
return usage
self._log_warning(
"Cannot find openai metrics in output, " "will calculate metrics from response data directly."
"Cannot find openai metrics in output, will calculate metrics from response data directly."
)

name = api_call.get("name")
Expand All @@ -56,24 +57,18 @@ def _get_openai_metrics_for_signal_api(self, api_call: dict):
# OpenAI v1 api:
# https://github.com/openai/openai-python/blob/main/src/openai/resources/chat/completions.py
# https://github.com/openai/openai-python/blob/main/src/openai/resources/completions.py
if (
name == "openai_chat_legacy"
or name == "openai_chat" # openai v1
):
if name == "openai_chat_legacy" or name == "openai_chat": # openai v1
return self.get_openai_metrics_for_chat_api(inputs, output)
elif (
name == "openai_completion_legacy"
or name == "openai_completion" # openai v1
):
elif name == "openai_completion_legacy" or name == "openai_completion": # openai v1
return self.get_openai_metrics_for_completion_api(inputs, output)
else:
self._log_warning(f"Calculating metrics for api {name} is not supported.")
raise Exception(f"Calculating metrics for api {name} is not supported.")

def _try_get_model(self, inputs, output):
if IS_LEGACY_OPENAI:
api_type = inputs.get("api_type")
if not api_type:
self._log_warning("Cannot calculate metrics for none or empty api_type.")
raise Exception("Cannot calculate metrics for none or empty api_type.")
if api_type == "azure":
model = inputs.get("engine")
else:
Expand All @@ -90,28 +85,33 @@ def _try_get_model(self, inputs, output):
if not model:
model = inputs.get("model")
if not model:
raise self._log_warning(
"Cannot get a valid model to calculate metrics. "
raise Exception(
"Cannot get a valid model to calculate metrics."
"Please specify a engine for AzureOpenAI API or a model for OpenAI API."
)
return model

def get_openai_metrics_for_chat_api(self, inputs, output):
metrics = {}
enc, tokens_per_message, tokens_per_name = self._get_encoding_for_chat_api(self._try_get_model(inputs, output))
metrics["prompt_tokens"] = self._get_prompt_tokens_from_messages(
inputs["messages"], enc, tokens_per_message, tokens_per_name
)
if isinstance(output, list):
if IS_LEGACY_OPENAI:
metrics["completion_tokens"] = len(output)
try:
enc, tokens_per_message, tokens_per_name = self._get_encoding_for_chat_api(
self._try_get_model(inputs, output)
)
metrics["prompt_tokens"] = self._get_prompt_tokens_from_messages(
inputs["messages"], enc, tokens_per_message, tokens_per_name
)
if isinstance(output, list):
if IS_LEGACY_OPENAI:
metrics["completion_tokens"] = len(output)
else:
metrics["completion_tokens"] = len(
[chunk for chunk in output if chunk.choices and chunk.choices[0].delta.content]
)
else:
metrics["completion_tokens"] = len(
[chunk for chunk in output if chunk.choices and chunk.choices[0].delta.content]
)
else:
metrics["completion_tokens"] = self._get_completion_tokens_for_chat_api(output, enc)
metrics["total_tokens"] = metrics["prompt_tokens"] + metrics["completion_tokens"]
metrics["completion_tokens"] = self._get_completion_tokens_for_chat_api(output, enc)
metrics["total_tokens"] = metrics["prompt_tokens"] + metrics["completion_tokens"]
except Exception as ex:
self._log_warning(f"Failed to calculate metrics due to exception: {ex}.")
return metrics

def _get_encoding_for_chat_api(self, model):
Expand All @@ -126,7 +126,7 @@ def _get_encoding_for_chat_api(self, model):
tokens_per_message = 3
tokens_per_name = 1
else:
self._log_warning(f"Calculating metrics for model {model} is not supported.")
raise Exception(f"Calculating metrics for model {model} is not supported.")
return enc, tokens_per_message, tokens_per_name

def _get_prompt_tokens_from_messages(self, messages, enc, tokens_per_message, tokens_per_name):
Expand Down Expand Up @@ -155,24 +155,27 @@ def _get_completion_tokens_for_chat_api(self, output, enc):

def get_openai_metrics_for_completion_api(self, inputs, output):
metrics = {}
enc = self._get_encoding_for_completion_api(self._try_get_model(inputs, output))
metrics["prompt_tokens"] = 0
prompt = inputs.get("prompt")
if isinstance(prompt, str):
metrics["prompt_tokens"] = len(enc.encode(prompt))
elif isinstance(prompt, list):
for pro in prompt:
metrics["prompt_tokens"] += len(enc.encode(pro))
if isinstance(output, list):
if IS_LEGACY_OPENAI:
metrics["completion_tokens"] = len(output)
try:
enc = self._get_encoding_for_completion_api(self._try_get_model(inputs, output))
metrics["prompt_tokens"] = 0
prompt = inputs.get("prompt")
if isinstance(prompt, str):
metrics["prompt_tokens"] = len(enc.encode(prompt))
elif isinstance(prompt, list):
for pro in prompt:
metrics["prompt_tokens"] += len(enc.encode(pro))
if isinstance(output, list):
if IS_LEGACY_OPENAI:
metrics["completion_tokens"] = len(output)
else:
metrics["completion_tokens"] = len(
[chunk for chunk in output if chunk.choices and chunk.choices[0].text]
)
else:
metrics["completion_tokens"] = len(
[chunk for chunk in output if chunk.choices and chunk.choices[0].text]
)
else:
metrics["completion_tokens"] = self._get_completion_tokens_for_completion_api(output, enc)
metrics["total_tokens"] = metrics["prompt_tokens"] + metrics["completion_tokens"]
metrics["completion_tokens"] = self._get_completion_tokens_for_completion_api(output, enc)
metrics["total_tokens"] = metrics["prompt_tokens"] + metrics["completion_tokens"]
except Exception as ex:
self._log_warning(f"Failed to calculate metrics due to exception: {ex}.")
return metrics

def _get_encoding_for_completion_api(self, model):
Expand Down
4 changes: 2 additions & 2 deletions src/promptflow/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
from filelock import FileLock
from pytest_mock import MockerFixture

from promptflow._cli._utils import AzureMLWorkspaceTriad
from promptflow._constants import PROMPTFLOW_CONNECTIONS
from promptflow._core.connection_manager import ConnectionManager
from promptflow._utils.context_utils import _change_working_dir
from promptflow._sdk._constants import AzureMLWorkspaceTriad
from promptflow._sdk.entities._connection import AzureOpenAIConnection
from promptflow._utils.context_utils import _change_working_dir

load_dotenv()

Expand Down
16 changes: 16 additions & 0 deletions src/promptflow/tests/executor/e2etests/test_eager_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ def test_batch_run(self, flow_folder, inputs_mapping, ensure_output, init_kwargs
batch_result = batch_engine.run(input_dirs, inputs_mapping, output_dir)
validate_batch_result(batch_result, flow_folder, output_dir, ensure_output)

def test_batch_run_with_openai(self, dev_connections):
flow_folder = "callable_class_with_openai"
inputs_mapping = {"question": "${data.question}", "stream": "${data.stream}"}
batch_engine = BatchEngine(
get_yaml_file(flow_folder, root=EAGER_FLOW_ROOT),
get_flow_folder(flow_folder, root=EAGER_FLOW_ROOT),
init_kwargs={"connection": "azure_open_ai_connection"},
connections=dev_connections,
)
input_dirs = {"data": get_flow_inputs_file(flow_folder, root=EAGER_FLOW_ROOT)}
output_dir = Path(mkdtemp())
batch_result = batch_engine.run(input_dirs, inputs_mapping, output_dir)
for token_name in ["prompt_tokens", "completion_tokens", "total_tokens"]:
assert getattr(batch_result.system_metrics, token_name, 0) > 0
validate_batch_result(batch_result, flow_folder, output_dir, lambda x: isinstance(x, dict))

def test_batch_run_with_invalid_case(self):
flow_folder = "dummy_flow_with_exception"
batch_engine = BatchEngine(
Expand Down
23 changes: 17 additions & 6 deletions src/promptflow/tests/executor/unittests/batch/test_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,18 @@ def get_node_run_infos(node_dict: dict, index=None, api_calls=None, system_metri
}


def get_flow_run_info(status_dict: dict, index: int):
def get_flow_run_info(status_dict: dict, index: int, api_calls=None, system_metrics=None):
status = Status.Failed if any(status == Status.Failed for status in status_dict.values()) else Status.Completed
error = {"code": "UserError", "message": "test message"} if status == Status.Failed else None
children = []
aggregated_tokens = {"total_tokens": 0, "prompt_tokens": 0, "completion_tokens": 0}
for i in range(len(status_dict)):
if api_calls is not None:
children.extend(api_calls)
if system_metrics is not None:
for k, _ in aggregated_tokens.items():
if k in system_metrics:
aggregated_tokens[k] += system_metrics[k]
return FlowRunInfo(
run_id=f"{index}_run_id",
status=status,
Expand All @@ -49,6 +58,8 @@ def get_flow_run_info(status_dict: dict, index: int):
start_time=datetime.utcnow(),
end_time=datetime.utcnow(),
index=index,
api_calls=[get_api_call("Flow", "Flow", children=children)] if api_calls else None,
system_metrics=aggregated_tokens if system_metrics else None,
)


Expand All @@ -57,7 +68,7 @@ def get_line_results(line_dict: dict, api_calls=None, system_metrics=None):
LineResult(
output={},
aggregation_inputs={},
run_info=get_flow_run_info(status_dict=v, index=k),
run_info=get_flow_run_info(status_dict=v, index=k, api_calls=api_calls, system_metrics=system_metrics),
node_run_infos=get_node_run_infos(node_dict=v, index=k, api_calls=api_calls, system_metrics=system_metrics),
)
for k, v in line_dict.items()
Expand Down Expand Up @@ -259,12 +270,12 @@ def test_create(slef):
}
aggr_result = get_aggregation_result(aggr_dict, system_metrics=aggr_system_metrics)
system_metrics = SystemMetrics.create(datetime.utcnow(), datetime.utcnow(), line_results, aggr_result)
assert system_metrics.total_tokens == 20
assert system_metrics.prompt_tokens == 12
assert system_metrics.total_tokens == 30
assert system_metrics.prompt_tokens == 18
assert system_metrics.completion_tokens == 8
system_metrics_dict = {
"total_tokens": 20,
"prompt_tokens": 12,
"total_tokens": 30,
"prompt_tokens": 18,
"completion_tokens": 8,
}
assert system_metrics_dict.items() <= system_metrics.to_dict().items()
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from openai import AzureOpenAI

from promptflow.connections import AzureOpenAIConnection
from promptflow.tracing import trace


class MyClass:
Expand Down Expand Up @@ -36,8 +35,7 @@ def generator():
yield chunk.choices[0].delta.content or ""
# We must return the generator object, not using yield directly here.
# Otherwise, the function itself will become a generator, despite whether stream is True or False.
# return generator()
return "".join(generator())
return generator()
else:
# chat api may return message with no content.
return completion.choices[0].message.content or ""

0 comments on commit 4845e67

Please sign in to comment.