-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Adapt Devstar/Gitea action integration #2308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,9 @@ | ||
| #!/bin/bash | ||
| python /app/pr_agent/servers/github_action_runner.py | ||
| export PYTHONUNBUFFERED=1 | ||
|
|
||
| if [ -n "$GITEA_EVENT_NAME" ] || [ "$GITEA_ACTIONS" == "true" ]; then | ||
| python /app/pr_agent/servers/gitea_action_runner.py | ||
| else | ||
| python /app/pr_agent/servers/github_action_runner.py | ||
| fi | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,7 +35,7 @@ def __init__(self, url: Optional[str] = None): | |
| self.logger.error("Gitea access token not found in settings.") | ||
| raise ValueError("Gitea access token not found in settings.") | ||
|
|
||
| self.repo_settings = get_settings().get("GITEA.REPO_SETTING", None) | ||
| self.repo_settings = ".pr_agent.toml" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 2. Hardcoded .pr_agent.toml path The Gitea provider now hardcodes the repository settings path to .pr_agent.toml instead of reading it from configuration. This makes behavior non-configurable and breaks the intended configuration override mechanism. Agent Prompt
|
||
| configuration = giteapy.Configuration() | ||
| configuration.host = "{}/api/v1".format(self.base_url) | ||
| configuration.api_key['Authorization'] = f'token {self.gitea_access_token}' | ||
|
|
@@ -543,6 +543,15 @@ def get_num_of_files(self) -> int: | |
| """Get number of files changed in the PR""" | ||
| return len(self.git_files) | ||
|
|
||
| def get_issue_main_description(self, issue_num: int) -> str: | ||
| """The AI validation logic automatically looks for this method name to retrieve acceptance criteria.""" | ||
| try: | ||
| issue = self.repo_api.get_issue(self.owner, self.repo, issue_num) | ||
| return issue.body if issue and hasattr(issue, 'body') else "" | ||
| except Exception as e: | ||
| self.logger.error(f"Failed to fetch Gitea issue body: {e}") | ||
| return "" | ||
|
Comment on lines
+551
to
+553
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 3. Broad except exception in provider New Gitea provider methods catch Exception broadly and then return empty values, which can mask unexpected failures and make debugging harder. This violates the requirement to use targeted exception handling and preserve error context. Agent Prompt
|
||
|
|
||
| def get_issue_comments(self) -> List[Dict[str, Any]]: | ||
| """Get all comments in the PR""" | ||
| index = self.issue_number if self.enabled_issue else self.pr_number | ||
|
|
@@ -605,23 +614,25 @@ def get_pr_labels(self,update=False) -> List[str]: | |
|
|
||
| return [label.name for label in labels] | ||
|
|
||
| def get_repo_settings(self) -> str: | ||
| """Get repository settings""" | ||
| if not self.repo_settings: | ||
| self.logger.error("Repository settings not found") | ||
| return "" | ||
|
|
||
| response = self.repo_api.get_file_content( | ||
| owner=self.owner, | ||
| repo=self.repo, | ||
| commit_sha=self.sha, | ||
| filepath=self.repo_settings | ||
| ) | ||
| if not response: | ||
| self.logger.error("Failed to get repository settings") | ||
| return "" | ||
|
|
||
| return response | ||
| def get_repo_settings(self) -> str: | ||
| try: | ||
| response = self.repo_api.get_file_content( | ||
| owner=self.owner, | ||
| repo=self.repo, | ||
| commit_sha=self.sha, | ||
| filepath=".pr_agent.toml" | ||
| ) | ||
|
|
||
|
|
||
| if isinstance(response, str): | ||
| return response.encode('utf-8') | ||
| return response | ||
| except Exception as e: | ||
| if "404" in str(e): | ||
| self.logger.info("No .pr_agent.toml found, using defaults") | ||
| else: | ||
| self.logger.error(f"Error fetching settings: {e}") | ||
| return b"" # Return an empty bytes object. | ||
|
|
||
| def get_user_id(self) -> str: | ||
| """Get the ID of the authenticated user""" | ||
|
|
@@ -957,6 +968,9 @@ def get_issue_labels(self, owner: str, repo: str, issue_number: int): | |
| repo=repo, | ||
| index=issue_number | ||
| ) | ||
| def get_issue(self, owner: str, repo: str, index: int): | ||
| """Get ticket issue """ | ||
| return self.issue.issue_get_issue(owner=owner, repo=repo, index=index) | ||
|
|
||
| def list_all_commits(self, owner: str, repo: str): | ||
| return self.repository.repo_get_all_commits( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,224 @@ | ||
| import asyncio | ||
| import json | ||
| import os | ||
| import re | ||
| from typing import Union, Dict, Any | ||
|
|
||
| from pr_agent.agent.pr_agent import PRAgent | ||
| from pr_agent.config_loader import get_settings | ||
| from pr_agent.git_providers.utils import apply_repo_settings | ||
| from pr_agent.log import get_logger | ||
| from pr_agent.tools.pr_code_suggestions import PRCodeSuggestions | ||
| from pr_agent.tools.pr_description import PRDescription | ||
| from pr_agent.tools.pr_reviewer import PRReviewer | ||
|
|
||
|
|
||
| def is_true(value: Union[str, bool]) -> bool: | ||
| if isinstance(value, bool): | ||
| return value | ||
| if isinstance(value, str): | ||
| return value.lower() == "true" | ||
| return False | ||
|
|
||
|
|
||
| def get_setting_or_env(key: str, default: Union[str, bool] = None) -> Union[str, bool]: | ||
| try: | ||
| value = get_settings().get(key, default) | ||
| except AttributeError: | ||
| value = ( | ||
| os.getenv(key, None) | ||
| or os.getenv(key.upper(), None) | ||
| or os.getenv(key.lower(), None) | ||
| or default | ||
| ) | ||
| return value | ||
|
|
||
|
|
||
| def normalize_url(url: str) -> str: | ||
| if not url: | ||
| return url | ||
| url = url.strip() | ||
| if not url.startswith(("http://", "https://")): | ||
| url = f"https://{url}" | ||
| return url.rstrip("/") | ||
|
|
||
|
|
||
| def should_process_pr_logic(body: Dict[str, Any]) -> bool: | ||
| """Advanced rule-based filtering: skip specific PRs that do not require review.""" | ||
| try: | ||
| pull_request = body.get("pull_request", {}) | ||
| title = pull_request.get("title", "") | ||
| sender = body.get("sender", {}).get("login") | ||
| source_branch = pull_request.get("head", {}).get("ref", "") | ||
| target_branch = pull_request.get("base", {}).get("ref", "") | ||
|
|
||
| # 1. Ignore specific users | ||
| ignore_pr_users = get_settings().get("CONFIG.IGNORE_PR_AUTHORS", []) | ||
| if ignore_pr_users and sender: | ||
| if any(re.search(regex, sender) for regex in ignore_pr_users): | ||
| get_logger().info(f"Skipped: ignoring PR from user '{sender}'") | ||
| return False | ||
|
|
||
| # 2. Ignore specific titles | ||
| if title: | ||
| ignore_pr_title_re = get_settings().get("CONFIG.IGNORE_PR_TITLE", []) | ||
| if not isinstance(ignore_pr_title_re, list): | ||
| ignore_pr_title_re = [ignore_pr_title_re] | ||
| if ignore_pr_title_re and any(re.search(regex, title) for regex in ignore_pr_title_re): | ||
| get_logger().info(f"Skipped: ignoring PR with matching title '{title}'") | ||
| return False | ||
|
|
||
| # 3. Ignore specific source branches | ||
| ignore_pr_source_branches = get_settings().get("CONFIG.IGNORE_PR_SOURCE_BRANCHES", []) | ||
| if ignore_pr_source_branches and source_branch: | ||
| if any(re.search(regex, source_branch) for regex in ignore_pr_source_branches): | ||
| get_logger().info(f"Skipped: ignoring PR from source branch '{source_branch}'") | ||
| return False | ||
|
|
||
| # 4. Ignore specific target branches | ||
| ignore_pr_target_branches = get_settings().get("CONFIG.IGNORE_PR_TARGET_BRANCHES", []) | ||
| if ignore_pr_target_branches and target_branch: | ||
| if any(re.search(regex, target_branch) for regex in ignore_pr_target_branches): | ||
| get_logger().info(f"Skipped: ignoring PR targeting branch '{target_branch}'") | ||
| return False | ||
| except Exception as e: | ||
| get_logger().error(f"Failed to execute should_process_pr_logic: {e}") | ||
| return True | ||
|
|
||
|
|
||
| async def run_action(): | ||
| # 1. Read environment variables | ||
| EVENT_NAME = os.environ.get("GITEA_EVENT_NAME") or os.environ.get("GITHUB_EVENT_NAME") | ||
| EVENT_PATH = os.environ.get("GITEA_EVENT_PATH") or os.environ.get("GITHUB_EVENT_PATH") | ||
| TOKEN = os.environ.get("GITEA_TOKEN") or os.environ.get("GITHUB_TOKEN") | ||
| OPENAI_KEY = os.environ.get("OPENAI_KEY") or os.environ.get("OPENAI.KEY") | ||
| OPENAI_ORG = os.environ.get("OPENAI_ORG") or os.environ.get("OPENAI.ORG") | ||
| DEVSTAR_URL = os.environ.get("DEVSTAR_URL") | ||
| GITEA_URL = os.environ.get("GITEA_URL") | ||
| FINAL_GITEA_URL = normalize_url(DEVSTAR_URL or GITEA_URL or "https://gitea.com") | ||
|
|
||
| if not EVENT_NAME or not EVENT_PATH or not TOKEN: | ||
| get_logger().error( | ||
| "Container startup failed: missing required environment variables (EVENT_NAME, EVENT_PATH, TOKEN)." | ||
| ) | ||
| return | ||
|
|
||
| # 2. Configure Git provider identity and self-hosted address | ||
| get_settings().set("config.git_provider", "gitea") | ||
| get_settings().set("gitea.personal_access_token", TOKEN) | ||
|
|
||
| if OPENAI_KEY: | ||
| get_settings().set("OPENAI.KEY", OPENAI_KEY) | ||
| else: | ||
| print("OPENAI_KEY not set") | ||
|
|
||
| if OPENAI_ORG: | ||
| get_settings().set("OPENAI.ORG", OPENAI_ORG) | ||
|
|
||
| get_settings().set("gitea.url", FINAL_GITEA_URL) | ||
| get_settings().set("gitea.api_url", f"{FINAL_GITEA_URL}/api/v1") | ||
|
|
||
| if DEVSTAR_URL: | ||
| get_logger().info(f"Resolved Gitea URL from DEVSTAR_URL: {FINAL_GITEA_URL}") | ||
| elif GITEA_URL: | ||
| get_logger().info(f"Resolved Gitea URL from GITEA_URL: {FINAL_GITEA_URL}") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 6. Runner whitespace/newline issues pr_agent/servers/gitea_actions_runner.py includes trailing whitespace and is missing a final newline, which can cause pre-commit hooks to fail. This violates repository whitespace and newline requirements. Agent Prompt
|
||
| else: | ||
| get_logger().info(f"No URL configuration detected, using default:{FINAL_GITEA_URL}") | ||
|
|
||
| get_settings().set("GITHUB.USER_TOKEN", TOKEN) | ||
| get_settings().set("GITHUB.DEPLOYMENT_TYPE", "user") | ||
|
|
||
| # 3. Read the local payload JSON | ||
| try: | ||
| with open(EVENT_PATH, "r") as f: | ||
| event_payload = json.load(f) | ||
| except Exception as e: | ||
| get_logger().error(f"Failed to parse payload JSON: {e}") | ||
| return | ||
|
Comment on lines
+132
to
+137
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 4. Broad except exception in runner pr_agent/servers/gitea_actions_runner.py uses broad except Exception in multiple places, which can hide real bugs and removes error specificity. This violates the requirement to catch expected exception types and preserve context. Agent Prompt
|
||
|
|
||
| # 4. Extract the PR URL | ||
| pr_url = None | ||
| if EVENT_NAME in ["pull_request", "pull_request_target"]: | ||
| pr_url = event_payload.get("pull_request", {}).get("html_url") | ||
| elif EVENT_NAME == "issue_comment": | ||
| issue_data = event_payload.get("issue", {}) | ||
| if "pull_request" in issue_data: | ||
| pr_url = issue_data.get("pull_request", {}).get("html_url") or issue_data.get("html_url") | ||
|
|
||
| if not pr_url: | ||
| get_logger().warning("No valid PR URL found in the payload. Exiting.") | ||
| return | ||
|
Comment on lines
+139
to
+150
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 7. Pr url field mismatch pr_agent/servers/gitea_actions_runner.py reads PR URL from pull_request.html_url, but the existing Gitea handler uses pull_request.url. If the Actions payload matches the webhook payload shape, pr_url will be empty and the runner exits without processing PR events. Agent Prompt
|
||
|
|
||
| # 5. Apply repository-level configuration (.pr_agent.toml) | ||
| try: | ||
| apply_repo_settings(pr_url) | ||
| except Exception as e: | ||
| get_logger().warning(f"Failed to apply repository settings: {e}") | ||
|
|
||
| if os.path.exists(".pr_agent.toml"): | ||
| get_logger().info("Loaded local .pr_agent.toml") | ||
| get_settings().load_file(".pr_agent.toml") | ||
| elif os.path.exists("/workspace/.pr_agent.toml"): | ||
| get_settings().load_file("/workspace/.pr_agent.toml") | ||
| get_logger().info("Loaded local .pr_agent.toml") | ||
|
|
||
| # 6. Inject multilingual instruction prompts | ||
| try: | ||
| response_language = get_settings().config.get("response_language", "en-us") | ||
| if response_language.lower() != "en-us": | ||
| get_logger().info(f"Custom response language detected: {response_language}") | ||
| lang_instruction_text = ( | ||
| f"Your response MUST be written in the language corresponding to locale code: " | ||
| f"'{response_language}'. This is crucial." | ||
| ) | ||
| separator_text = "\n======\n\nIn addition, " | ||
|
|
||
| for key in get_settings(): | ||
| setting = get_settings().get(key) | ||
| if str(type(setting)) == "<class 'dynaconf.utils.boxing.DynaBox'>": | ||
| if key.lower() in ["pr_description", "pr_code_suggestions", "pr_reviewer"]: | ||
| if hasattr(setting, "extra_instructions"): | ||
| extra_instructions = setting.extra_instructions | ||
| if lang_instruction_text not in str(extra_instructions): | ||
| updated_instructions = ( | ||
| str(extra_instructions) + separator_text + lang_instruction_text | ||
| if extra_instructions | ||
| else lang_instruction_text | ||
| ) | ||
| setting.extra_instructions = updated_instructions | ||
| except Exception as e: | ||
| get_logger().warning(f"Instruction injection failed: {e}") | ||
|
|
||
| # 7. Core event routing and execution | ||
| if EVENT_NAME in ["pull_request", "pull_request_target"]: | ||
| if not should_process_pr_logic(event_payload): | ||
| return | ||
|
|
||
| action = event_payload.get("action") | ||
| if action in ["opened", "reopened", "ready_for_review", "synchronize", "synchronized"]: | ||
| get_settings().config.is_auto_command = True | ||
| get_logger().info(f"Triggered automated review action: {action}") | ||
|
|
||
| auto_describe = get_setting_or_env("GITHUB_ACTION_CONFIG.AUTO_DESCRIBE", True) | ||
| auto_review = get_setting_or_env("GITHUB_ACTION_CONFIG.AUTO_REVIEW", True) | ||
| auto_improve = get_setting_or_env("GITHUB_ACTION_CONFIG.AUTO_IMPROVE", True) | ||
|
|
||
| if is_true(auto_describe) and action not in ["synchronize", "synchronized"]: | ||
| await PRDescription(pr_url).run() | ||
| if is_true(auto_review): | ||
| await PRReviewer(pr_url).run() | ||
| if is_true(auto_improve): | ||
| await PRCodeSuggestions(pr_url).run() | ||
| else: | ||
| get_logger().info(f"Ignoring unconfigured PR action: {action}") | ||
|
|
||
| elif EVENT_NAME == "issue_comment": | ||
| action = event_payload.get("action") | ||
| if action in ["created", "edited"]: | ||
| comment_body = event_payload.get("comment", {}).get("body", "").strip() | ||
| if comment_body.startswith("/"): | ||
| get_logger().info(f"Received user command: {comment_body}") | ||
| await PRAgent().handle_request(pr_url, comment_body) | ||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(run_action()) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1. entrypoint.sh calls missing runner
📘 Rule violation☼ ReliabilityAgent Prompt
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools