Skip to content
1 change: 1 addition & 0 deletions tests/test_eval_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def _run_cli(monkeypatch, overrides):
"state_columns": [],
"save_results": False,
"save_every": -1,
"resume_from_path": None,
"save_to_hf_hub": False,
"hf_hub_dataset_name": "",
}
Expand Down
22 changes: 21 additions & 1 deletion verifiers/envs/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import logging
import signal
import sys
import time
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
Expand Down Expand Up @@ -39,7 +40,11 @@
State,
)
from verifiers.utils.async_utils import maybe_semaphore
from verifiers.utils.eval_utils import make_dataset, save_rollout_results
from verifiers.utils.eval_utils import (
load_from_disk,
make_dataset,
save_rollout_results,
)
from verifiers.utils.message_utils import (
concat_messages,
get_overlong_prompt_dummy_response,
Expand Down Expand Up @@ -790,12 +795,27 @@ async def evaluate(
state_columns: list[str] | None = None,
save_results: bool = False,
save_every: int = -1,
resume_from_path: Path | None = None,
**kwargs,
) -> GenerateOutputs:
"""
Evaluate model on the Environment evaluation dataset.
"""
inputs = self.get_eval_inputs(num_examples, rollouts_per_example)
if resume_from_path is not None:
if not resume_from_path.exists():
raise FileNotFoundError(
f"Resume path does not exist: {resume_from_path}"
)
finished_rollouts, _ = load_from_disk(resume_from_path)
finished_example_ids = list(finished_rollouts["example_id"])
inputs = [i for i in inputs if i["example_id"] not in finished_example_ids]
if len(inputs) == 0:
self.logger.info("No inputs left to evaluate. Exiting.")
sys.exit(0)
self.logger.info(
f"Found {len(set(finished_example_ids))} finished group(s) ({len(finished_example_ids)} total rollouts), skipping them"
)
return await self.generate(
inputs,
client=client,
Expand Down
8 changes: 8 additions & 0 deletions verifiers/scripts/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ def main():
default=-1,
help="Save dataset every n rollouts",
)
parser.add_argument(
"--resume-from-path",
"-R",
type=str,
default=None,
help="Path to resume evaluation from (to be used in conjunction with -f)",
)
parser.add_argument(
"--save-to-hf-hub",
"-H",
Expand Down Expand Up @@ -311,6 +318,7 @@ def main():
state_columns=args.state_columns,
save_results=args.save_results,
save_every=args.save_every,
resume_from_path=args.resume_from_path,
save_to_hf_hub=args.save_to_hf_hub,
hf_hub_dataset_name=args.hf_hub_dataset_name,
)
Expand Down
1 change: 1 addition & 0 deletions verifiers/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,5 +229,6 @@ class EvalConfig(BaseModel):
state_columns: list[str] | None = None
save_results: bool = False
save_every: int = -1
resume_from_path: str | None = None
save_to_hf_hub: bool = False
hf_hub_dataset_name: str | None = None
66 changes: 63 additions & 3 deletions verifiers/utils/eval_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
from typing import cast

import numpy as np
from datasets import Dataset, disable_progress_bar, enable_progress_bar
from datasets import (
Dataset,
concatenate_datasets,
disable_progress_bar,
enable_progress_bar,
)
from datasets.utils import logging as ds_logging

import verifiers as vf
Expand Down Expand Up @@ -110,8 +115,14 @@ async def run_evaluation(config: EvalConfig) -> GenerateOutputs:
vf_env = vf.load_environment(env_id=config.env_id, **config.env_args)

# run evaluation
results_path = get_eval_results_path(config)
logger.info(f"Starting evaluation with model: {config.model}")
if not config.resume_from_path:
results_path = get_eval_results_path(config)
resume_from_path = None
logger.info(f"Starting evaluation with model: {config.model}")
else:
results_path = Path(config.resume_from_path)
resume_from_path = results_path
logger.info(f"Resuming evaluation from {results_path}")
logger.info(
f"Configuration: num_examples={config.num_examples}, rollouts_per_example={config.rollouts_per_example}, max_concurrent={config.max_concurrent}"
)
Expand All @@ -129,6 +140,7 @@ async def run_evaluation(config: EvalConfig) -> GenerateOutputs:
state_columns=config.state_columns,
save_results=config.save_results,
save_every=config.save_every,
resume_from_path=resume_from_path,
)
end_time = time.time()
logger.info(f"Evaluation completed in {end_time - start_time:.2f} seconds")
Expand Down Expand Up @@ -209,12 +221,60 @@ def quiet_datasets():

def save_to_disk(dataset: Dataset, metadata_dict: dict, path_to_save: Path):
path_to_save.parent.mkdir(parents=True, exist_ok=True)

try:
# If path_to_save already exists, we append the new rollout results and merge the metadata
existing_dataset, existing_metadata_dict = load_from_disk(path_to_save)
metadata_dict = {
**existing_metadata_dict,
**metadata_dict,
"time_ms": existing_metadata_dict["time_ms"] + metadata_dict["time_ms"],
"avg_reward": (
existing_metadata_dict["avg_reward"] * len(existing_dataset)
+ metadata_dict["avg_reward"] * len(dataset)
)
/ (len(dataset) + len(existing_dataset)),
"avg_metrics": {
metric_name: (
existing_metadata_dict["avg_metrics"][metric_name]
* len(existing_dataset)
+ metadata_dict["avg_metrics"][metric_name] * len(dataset)
)
/ (len(dataset) + len(existing_dataset))
for metric_name in existing_metadata_dict["avg_metrics"]
},
}
seen = set()

def is_new(example: dict) -> bool:
"""De-duplicate based on completion."""
k = json.dumps(list(example["completion"]))
if k in seen:
return False
seen.add(k)
return True

with quiet_datasets():
dataset = concatenate_datasets([existing_dataset, dataset]).filter(is_new)
except FileNotFoundError:
pass

with quiet_datasets():
dataset.to_json(path_to_save / "results.jsonl")
with open(path_to_save / "metadata.json", "w") as f:
json.dump(metadata_dict, f)


def load_from_disk(path_to_load: Path) -> tuple[Dataset, dict]:
with quiet_datasets():
dataset = cast(
Dataset, Dataset.from_json((path_to_load / "results.jsonl").as_posix())
)
with open(path_to_load / "metadata.json", "r") as f:
metadata_dict = json.load(f)
return dataset, metadata_dict


def save_rollout_results(
results: GenerateOutputs,
push_to_hf_hub: bool = False,
Expand Down
Loading