-
Couldn't load subscription status.
- Fork 214
[WIP]support agent from API_URL_BASE #484
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Summary of ChangesHello @yulangz, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a robust and flexible architecture for integrating external agent frameworks with the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant feature to support agent-based reinforcement learning by proxying an OpenAI-compatible API. This is a powerful abstraction that enables integration with various agent frameworks. The implementation includes a FastAPI-based proxy server, monkey-patching of the agents library, and several new example training scripts for math and search agents.
The overall approach is sound, but as this is a work-in-progress, there are several areas that need attention. I've identified some potential bugs, resource management issues, and opportunities for code improvement. My detailed comments below address these points, focusing on correctness, maintainability, and robustness.
| if self.config.wandb.wandb_base_url: | ||
| os.environ["WANDB_API_KEY"] = self.config.wandb.wandb_api_key | ||
| if self.config.wandb.wandb_api_key: | ||
| os.environ["WANDB_BASE_URL"] = self.config.wandb.wandb_base_url |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for setting the WANDB_API_KEY and WANDB_BASE_URL environment variables appears to be swapped. The API key is set based on the presence of the base URL, and vice versa. This will cause wandb.login() to fail or connect to the wrong endpoint if only one of the two is configured.
| if self.config.wandb.wandb_base_url: | |
| os.environ["WANDB_API_KEY"] = self.config.wandb.wandb_api_key | |
| if self.config.wandb.wandb_api_key: | |
| os.environ["WANDB_BASE_URL"] = self.config.wandb.wandb_base_url | |
| if self.config.wandb.wandb_api_key: | |
| os.environ["WANDB_API_KEY"] = self.config.wandb.wandb_api_key | |
| if self.config.wandb.wandb_base_url: | |
| os.environ["WANDB_BASE_URL"] = self.config.wandb.wandb_base_url |
| completion_str = resp.final_output | ||
|
|
||
| # agent extracts tool callings from the llm response | ||
| tool_calls = agent.consume_llm_response(resp, completion_str) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method agent.consume_llm_response is called with two arguments, resp and completion_str. However, its definition in examples/openai-agents/asearcher/agent/search_agent.py only accepts one argument, completion_text. This will cause a TypeError at runtime.
| tool_calls = agent.consume_llm_response(resp, completion_str) | |
| tool_calls = agent.consume_llm_response(completion_str) |
| # call tool and compute reward | ||
| if tool_calls is not None and len(tool_calls) > 0: | ||
| tool_call = tool_calls[0] | ||
| res = (await self.toolbox.step((qid, [tool_call])))[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if self.search_only | ||
| else SEARCH_ACCESS_PROMPT_TEMPLATE | ||
| ) | ||
| prompt = prompt_template.format(question=data["question"]) | ||
| valid_inst: bool = np.random.uniform(0, 1) <= self.valid_inst_ratio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def __del__(self): | ||
| """Ensure socket is closed on deletion.""" | ||
| if self.server: | ||
| self.server.should_exit = True | ||
| self.server = None | ||
| if self.sock: | ||
| self.sock.close() | ||
| self.sock = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using __del__ to close sockets and shut down the uvicorn server is unreliable. The __del__ method is not guaranteed to be called when an object's reference count drops to zero, which can lead to resource leaks (e.g., open sockets, running server threads).
It would be more robust to provide an explicit shutdown method that can be called deterministically by the owner of the ProxyServer instance to ensure resources are properly released.
| def __del__(self): | |
| """Ensure socket is closed on deletion.""" | |
| if self.server: | |
| self.server.should_exit = True | |
| self.server = None | |
| if self.sock: | |
| self.sock.close() | |
| self.sock = None | |
| def shutdown(self): | |
| """Ensure socket is closed on deletion.""" | |
| if self.server: | |
| self.server.should_exit = True | |
| self.server = None | |
| if self.sock: | |
| self.sock.close() | |
| self.sock = None | |
| def __del__(self): | |
| self.shutdown() |
| except Exception as e: | ||
| # 如果出现问题,确保关闭socket | ||
| if "sock" in locals() and sock: | ||
| sock.close() | ||
| raise e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the except block, if socket.socket() fails, the sock variable will not be defined, leading to a NameError when if "sock" in locals() and sock: is executed. It's safer to initialize sock to None before the try block to prevent this crash.
Additionally, the comment on line 88 is in Chinese, which is inconsistent with the English comments in the rest of the file. It should be translated for consistency.
| except Exception as e: | |
| # 如果出现问题,确保关闭socket | |
| if "sock" in locals() and sock: | |
| sock.close() | |
| raise e | |
| except Exception as e: | |
| # Ensure the socket is closed if an issue occurs. | |
| if 'sock' in locals() and sock: | |
| sock.close() | |
| raise e |
| def process_webpage(self, content): | ||
| keys = [ | ||
| ("title", "title"), | ||
| ("p", "p"), | ||
| ("li", "li", lambda c: "\n" not in c), | ||
| ("td", "td"), | ||
| ("tr", "tr"), | ||
| ] | ||
| content_list = [] | ||
| init_length = len(content) | ||
| while any([f"<{k[0]}" in content and f"</{k[1]}>" in content for k in keys]): | ||
| klr = [] | ||
| for k in keys: | ||
| start = 0 | ||
| # print(k) | ||
| while True: | ||
| ls = [content[start:].find(f"<{k[0]}{c}") for c in [">", " "]] | ||
| ls = [l for l in ls if l != -1] | ||
| l = -1 if len(ls) == 0 else min(ls) | ||
| # print(ls) | ||
| if l == -1: | ||
| break | ||
| l += start | ||
| r = content[l:].find(f"</{k[1]}>") | ||
| if r == -1: | ||
| break | ||
| if (len(k) <= 2) or (len(k) >= 3 and k[2](content[l : l + r])): | ||
| # print(k, l, l+r) | ||
| klr.append((k, l, l + r)) | ||
| break | ||
| start = l + r | ||
|
|
||
| if len(klr) == 0: | ||
| break | ||
| klr = sorted(klr, key=lambda x: x[1]) | ||
| k, l, r = klr[0] | ||
| content_list.append(content[l : r + len(f"</{k[1]}>")]) | ||
| # print(content_list[-1]) | ||
| # input("stop...") | ||
| if k[0] == "p": | ||
| content_list[-1] += "\n\n" | ||
| elif k[0] == "li": | ||
| content_list[-1] += "\n" | ||
| content = content[r:] | ||
| content = "".join(content_list) | ||
| final_length = len(content) | ||
| logger.info( | ||
| f"process the webpage: {init_length} -> {final_length}. {content[:100]}" | ||
| ) | ||
| return content |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The process_webpage method uses string searching (find) and manual slicing to parse HTML content. This approach is very brittle and can easily break with small variations in HTML structure. Using a dedicated HTML parsing library like BeautifulSoup would be far more robust and maintainable.
Here is an example of how you could implement this with BeautifulSoup:
from bs4 import BeautifulSoup
def process_webpage(self, content):
soup = BeautifulSoup(content, 'html.parser')
# Extract text from relevant tags
texts = []
for tag in soup.find_all(['title', 'p', 'li', 'td', 'tr']):
texts.append(tag.get_text(separator=' ', strip=True))
processed_content = "\n\n".join(texts)
logger.info(
f"process the webpage: {len(content)} -> {len(processed_content)}. {processed_content[:100]}"
)
return processed_content| proxy_thread = threading.Thread( | ||
| target=self.proxy_server.run, args=(sock,), daemon=True | ||
| ) | ||
| logger.info(f"[wht debug] Starting proxy server on port {port}") | ||
| proxy_thread.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ProxyServer is started in a daemon thread, but there is no corresponding mechanism to explicitly shut it down when the training finishes or an error occurs. Relying on __del__ for cleanup is not reliable and can lead to leaked resources like sockets and threads. The MultiturnRLVRAgentWorkflow should manage the lifecycle of the ProxyServer and ensure its shutdown method (which should be implemented) is called.
| # Many of this code are copied from areal/experimental/openai/client.py | ||
| # I only add lock for thread safety |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment on line 228 highlights that a significant amount of code for reward and completion management (set_reward, apply_reward_discount, export_completions, etc.) is duplicated from areal/experimental/openai/client.py. This duplication makes the code harder to maintain, as changes will need to be made in two places.
Consider refactoring this logic into a shared CompletionCacheManager class that both ArealOpenAI and ProxyServer can use. ProxyServer could then manage a dictionary of these managers, keyed by task_id, to handle concurrent tasks.
Description
Draft support agent from API_URL_BASE.
Support RL for any Agent framework by setting API_URL_BASE.
Related Issue
Type of Change
work as expected)
Checklist
jb build docs/gemini review)Breaking Change Details (if applicable):
Additional Context
Need help? Check the Contributing Guide or ask in
GitHub Discussions!