Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion areal/api/cli_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,13 @@ def new(self, **kwargs):
args.update(kwargs)
return GenerationHyperparameters(**args)


@dataclass
class PRMRewardHyperparameters:
reward_shaping_alpha: float = field(
default=0.02,
metadata={"help": "reward shaping alpha"},
)

# Train Engine Configs


Expand Down Expand Up @@ -1118,6 +1124,19 @@ class GRPOConfig(BaseExperimentConfig):
actor: PPOActorConfig = field(default_factory=PPOActorConfig)
ref: PPOActorConfig = field(default_factory=PPOActorConfig)

@dataclass
class PRMConfig(BaseExperimentConfig):
async_training: bool = field(default=True)
prm_path: str = field(default="")
gconfig: GenerationHyperparameters = field(
default_factory=GenerationHyperparameters
)
prmconfig: PRMRewardHyperparameters = field(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like that we can just inheirt GRPOConfig and add two new fields prm_path and reward_shaping_alpha? BTW if you refer to reward scaling, you can use actor.reward_scaling rather than creating a new field.

default_factory=PRMRewardHyperparameters
)
rollout: InferenceEngineConfig = field(default_factory=InferenceEngineConfig)
actor: PPOActorConfig = field(default_factory=PPOActorConfig)
ref: PPOActorConfig = field(default_factory=PPOActorConfig)

@dataclass
class PPOConfig(GRPOConfig):
Expand Down
2 changes: 1 addition & 1 deletion areal/utils/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

logger = logging.getLogger("Launcher Utils")

LOCAL_CACHE_DIR = "/tmp/areal"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should revert.

LOCAL_CACHE_DIR = "/data/yl/AReaL/tmp/areal"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The LOCAL_CACHE_DIR is hardcoded to a user-specific path. This makes the code non-portable and will likely cause it to fail on other developers' machines. It's better to use a more standard temporary directory or allow this path to be configured via an environment variable.

Suggested change
LOCAL_CACHE_DIR = "/data/yl/AReaL/tmp/areal"
LOCAL_CACHE_DIR = os.environ.get("AREAL_CACHE_DIR", "/tmp/areal")

PYTORCH_KERNEL_CACHE_PATH = (
f"{LOCAL_CACHE_DIR}/.cache/{getpass.getuser()}/torch/kernels/"
)
Expand Down
158 changes: 158 additions & 0 deletions areal/workflow/rlvr_prm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import asyncio
from dataclasses import asdict
import os
import uuid

import aiofiles
import aiofiles.os
import colorama
import torch
from transformers import PreTrainedTokenizerFast, PreTrainedModel

from areal.api.cli_args import GenerationHyperparameters, PRMRewardHyperparameters
from areal.api.engine_api import InferenceEngine
from areal.api.io_struct import ModelRequest
from areal.api.reward_api import AsyncRewardWrapper
from areal.api.workflow_api import RolloutWorkflow
from areal.utils import logging, stats_tracker
from areal.utils.data import concat_padded_tensors

logger = logging.getLogger("RLVR workflow")


class PRMRLVRWorkflow(RolloutWorkflow):
def __init__(
self,
reward_fn,
reward_fn_prm,
gconfig: GenerationHyperparameters,
prmconfig: PRMRewardHyperparameters,
tokenizer: PreTrainedTokenizerFast,
# prm_model: PreTrainedModel,
# prm_tokenizer: PreTrainedTokenizerFast,
Comment on lines +32 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These parameters for prm_model and prm_tokenizer are commented out, along with their usage later in the file. This dead code should be removed to improve code clarity and maintainability.

enable_thinking: bool,
rollout_stat_scope: bool = "rollout",
dump_dir: str | None = None,
):
self.reward_fn = reward_fn
self.gconfig = gconfig
self.prmconfig = prmconfig
self.tokenizer = tokenizer
# self.prm_model = prm_model
# self.prm_tokenizer = prm_tokenizer
self.enable_thinking = enable_thinking
self.dump_dir = dump_dir
self.rollout_stat_scope = rollout_stat_scope
self.async_reward_fn = AsyncRewardWrapper(reward_fn)
self.async_reward_fn_prm = AsyncRewardWrapper(reward_fn_prm)
if self.dump_dir is not None and not os.path.exists(self.dump_dir):
os.makedirs(self.dump_dir, exist_ok=True)

async def arun_episode(self, engine: InferenceEngine, data):
input_ids = self.tokenizer.apply_chat_template(
data["messages"],
tokenize=True,
add_generation_prompt=True,
enable_thinking=self.enable_thinking,
)

n_samples = self.gconfig.n_samples
req = ModelRequest(
rid=uuid.uuid4().hex,
input_ids=input_ids,
gconfig=self.gconfig.new(n_samples=1),
tokenizer=self.tokenizer,
)
resps = await asyncio.gather(*[engine.agenerate(req) for _ in range(n_samples)])

version = engine.get_version()
prompt_strs = []
completions_strs = []
rewards = []
prm_rewards = []
seqlens = []

results = []
for resp in resps:
seq = resp.input_tokens + resp.output_tokens
logprobs = [0.0] * resp.input_len + resp.output_logprobs
loss_mask = [0] * resp.input_len + [1] * resp.output_len
versions = [-1] * resp.input_len + resp.output_versions

prompt_str = self.tokenizer.decode(input_ids)
completions_str = self.tokenizer.decode(resp.output_tokens)
prompt_strs.append(prompt_str)
completions_strs.append(completions_str)
seqlens.append(len(seq))
result_reward = await self.async_reward_fn(
prompt_str,
completions_str,
resp.input_tokens,
resp.output_tokens,
**data,
)
prm_reward = await self.async_reward_fn(
prompt_str,
completions_str,
resp.input_tokens,
resp.output_tokens,
# self.prm_model,
# self.prm_tokenizer,
**data,
)
reward = self.prmconfig.reward_shaping_alpha * prm_reward + result_reward

# Log reward.
stats_tracker.get(self.rollout_stat_scope).scalar(reward=reward)

rewards.append(reward)
prm_rewards.append(prm_reward)

res = dict(
# unsqueeze to add an additional batch dimension
input_ids=torch.tensor(seq).unsqueeze(0),
loss_mask=torch.tensor(loss_mask).unsqueeze(0),
logprobs=torch.tensor(logprobs).unsqueeze(0),
versions=torch.tensor(versions).unsqueeze(0),
attention_mask=torch.ones(len(seq), dtype=torch.bool).unsqueeze(0),
# reward
rewards=torch.tensor([float(reward)]),
)
results.append(res)

# clip mechanism
avg_prm_reward = sum(prm_rewards) / len(prm_rewards)
for i, val in enumerate(prm_rewards):
if val > avg_prm_reward:
rewards[i] = 0
for res, r in zip(results, rewards):
res["rewards"] = torch.tensor([float(r)])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic in this "clip mechanism" seems counter-intuitive. It sets the final reward to 0 for samples where the prm_reward is above average. This effectively punishes samples that have good intermediate steps according to the PRM. Could you clarify the intent here? If this is a bug, perhaps the condition should be inverted (e.g., val < avg_prm_reward) or a different logic should be applied.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some comments or configurations to control this behavior?

This workflow still uses an outcome-based reward. How's the PRM actually used?


if self.dump_dir is not None:
dump_path = os.path.join(self.dump_dir, str(version))
await aiofiles.os.makedirs(dump_path, exist_ok=True)
# Get the unique identifier for this prompt
qid = None
for key in ["query_id", "id", "qid"]:
qid = data.get(key, None)
if qid is not None:
break
qid = qid or uuid.uuid4().hex

# Dump rollout to file
file_path = os.path.join(dump_path, f"{qid}.txt")
async with aiofiles.open(file_path, "a") as f:
n_samples = self.gconfig.n_samples
for i, (p, c, r, sl) in enumerate(
zip(prompt_strs, completions_strs, rewards, seqlens)
):
info = "\n".join(
[
f"idx: {i + 1} / {n_samples}, seqlen: {sl}, reward is {r}.",
f"prompt is \n{colorama.Fore.YELLOW + colorama.Style.DIM}{p}{colorama.Style.RESET_ALL}",
f"sequence is: \n{colorama.Fore.YELLOW + colorama.Style.DIM}{c}{colorama.Style.RESET_ALL}",
]
)
await f.write(info + "\n")

return concat_padded_tensors(results)
Loading