- 
                Notifications
    You must be signed in to change notification settings 
- Fork 214
[WIP] [feature] single controller: add train controller #414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| Summary of ChangesHello @daihaowz, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request lays the groundwork for a distributed training system by introducing a new  Highlights
 Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either  
 Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a  Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a DistributedTrainController and related utilities, which is a significant step towards a single controller architecture. The changes also include refactoring the scheduler and engine APIs for better clarity and consistency. My review focuses on improving the robustness and maintainability of the new components. I've identified a critical bug related to uninitialized state, some high-severity design issues in error handling that could lead to unexpected process termination, and several medium-severity suggestions to improve code consistency and clarity through type hints and documentation.
| for worker in workers | ||
| ] | ||
| try: | ||
| results = wait_future_ordered(futures, exit_on_exception=True) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to wait_future_ordered uses exit_on_exception=True, which will cause the entire process to terminate via os.kill if any of the RPC calls fail. This is an aggressive error handling strategy that prevents any cleanup or higher-level exception management. Furthermore, it makes the surrounding try...except block misleading, as the except block will never be reached upon an exception within a future. It's better to let exceptions propagate by setting exit_on_exception=False (which is the default) and allow the caller of rpc_call to handle failures gracefully.
| results = wait_future_ordered(futures, exit_on_exception=True) | |
| results = wait_future_ordered(futures) | 
| logger.warning(traceback.format_exc()) | ||
| if exit_on_exception: | ||
| logger.info("Exiting due to exception in future.") | ||
| os.kill(os.getpid(), signal.SIGTERM) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using os.kill(os.getpid(), signal.SIGTERM) for exception handling is a very aggressive approach. It abruptly terminates the entire process, which can lead to resource leaks, prevent proper cleanup, and make the function difficult to reuse safely in different contexts. It's generally better to propagate exceptions to the caller, allowing them to decide on the appropriate error handling strategy. The exit_on_exception flag provides an option, but terminating the process should be avoided in a library/utility function.
|  | ||
| class Scheduler(abc.ABC): | ||
| def create_workers(self, worker_key, scheduler_config, *args, **kwargs) -> str: | ||
| def create_workers(self, job: Job, *args, **kwargs): | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The create_workers method is missing a return type annotation. Based on its usage in areal/controller/train_controller.py (where its return value is assigned to self.uid), it is expected to return a string representing the job ID. Adding the -> str type hint improves code clarity and enables better static analysis.
| def create_workers(self, job: Job, *args, **kwargs): | |
| def create_workers(self, job: Job, *args, **kwargs) -> str: | 
        
          
                areal/controller/train_controller.py
              
                Outdated
          
        
      | self.uid: str | ||
| self.workers: List[Worker] | ||
|  | ||
| # todo: delete this method | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def create_engine_with_retry( | ||
| create_engine_func, max_retries=60, retry_delay=10, *args, **kwargs | ||
| ): | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is missing type hints for its parameters. Adding type hints for create_engine_func, max_retries, and retry_delay would improve readability and allow for static analysis. You will need to add from typing import Callable.
| def create_engine_with_retry( | |
| create_engine_func, max_retries=60, retry_delay=10, *args, **kwargs | |
| ): | |
| def create_engine_with_retry( | |
| create_engine_func: Callable[..., Any], max_retries: int = 60, retry_delay: int = 10, *args, **kwargs | |
| ) -> Any: | 
| """ | ||
| 工具方法:并发RPC调用 | ||
| :param scheduler: 调度器对象, 必须有 call_engine(worker_id, method, *args, **kwargs) | ||
| :param workers: 可遍历的worker列表,每个worker应有 worker.id 属性 | ||
| :param method: 方法名字符串 | ||
| :param args: 传递给call_engine的*args | ||
| :param kwargs: 传递给call_engine的**kwargs | ||
| :return: results | ||
| """ | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring for this function is in Chinese, while the rest of the codebase is in English. For consistency and to make it accessible to all contributors, please translate the docstring to English.
| """ | |
| 工具方法:并发RPC调用 | |
| :param scheduler: 调度器对象, 必须有 call_engine(worker_id, method, *args, **kwargs) | |
| :param workers: 可遍历的worker列表,每个worker应有 worker.id 属性 | |
| :param method: 方法名字符串 | |
| :param args: 传递给call_engine的*args | |
| :param kwargs: 传递给call_engine的**kwargs | |
| :return: results | |
| """ | |
| """ | |
| Utility method for concurrent RPC calls. | |
| :param scheduler: Scheduler object, must have a `call_engine(worker_id, method, *args, **kwargs)` method. | |
| :param workers: An iterable list of workers, where each worker should have a `worker.id` attribute. | |
| :param method: The name of the method to call as a string. | |
| :param args: Positional arguments to pass to `call_engine`. | |
| :param kwargs: Keyword arguments to pass to `call_engine`. | |
| :return: A list of results from the RPC calls. | |
| """ | 
| This pull request has been automatically marked as stale because it has not had recent activity within the last 14 days. Please add a comment or push new commits to keep it active. Thank you for your contribution! | 
| [email protected]> This pull request has been automatically marked as stale because it has not had recent activity within the last 14 days. 
 | 
| 0x7e43d3a147f66a953979e4272f0368dac3a5c826> | 
No description provided.