-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstar_following_repos.py
265 lines (220 loc) · 10.4 KB
/
star_following_repos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
import aiohttp
import asyncio
import sys
import json
import time
from datetime import datetime, timedelta
from typing import List, Set, Dict
from pathlib import Path
from tqdm import tqdm
class GithubStarManager:
def __init__(self, token: str):
self.token = token
self.headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3+json'
}
self.session = None
self.progress_bar = None
self.rate_limit_remaining = None
self.rate_limit_reset = None
async def create_session(self):
if not self.session:
self.session = aiohttp.ClientSession(headers=self.headers)
async def close_session(self):
if self.session:
await self.session.close()
self.session = None
def update_rate_limits(self, headers):
"""更新 API 速率限制信息"""
self.rate_limit_remaining = int(headers.get('X-RateLimit-Remaining', 0))
reset_time = int(headers.get('X-RateLimit-Reset', 0))
self.rate_limit_reset = datetime.fromtimestamp(reset_time)
async def wait_for_rate_limit(self):
"""处理 API 速率限制"""
if self.rate_limit_remaining is not None and self.rate_limit_remaining < 1:
now = datetime.now()
if self.rate_limit_reset and self.rate_limit_reset > now:
wait_time = (self.rate_limit_reset - now).total_seconds() + 1
if self.progress_bar:
self.progress_bar.write(f'达到 API 限制,等待 {wait_time:.1f} 秒后继续...')
await asyncio.sleep(wait_time)
async def get_user_starred_repos(self, username: str) -> List[Dict]:
"""获取用户的 star 列表"""
await self.create_session()
starred_repos = []
page = 1
max_retries = 3
while True:
await self.wait_for_rate_limit()
retry_count = 0
while retry_count < max_retries:
try:
url = f'https://api.github.com/users/{username}/starred'
async with self.session.get(url, params={'page': page, 'per_page': 100}) as response:
self.update_rate_limits(response.headers)
if response.status == 200:
repos = await response.json()
if not repos:
return starred_repos
starred_repos.extend(repos)
print(f'已获取第 {page} 页,当前共 {len(starred_repos)} 个仓库')
page += 1
await asyncio.sleep(0.5) # 基础延迟
break # 成功获取数据,跳出重试循环
elif response.status == 403: # 速率限制
await self.wait_for_rate_limit()
retry_count += 1
else:
error_msg = await response.text()
print(f'\n请求第 {page} 页失败: HTTP {response.status} - {error_msg}')
retry_count += 1
if retry_count < max_retries:
print(f'将在 3 秒后进行第 {retry_count + 1} 次重试...')
await asyncio.sleep(3)
except Exception as e:
print(f'\n请求第 {page} 页时发生异常: {str(e)}')
retry_count += 1
if retry_count < max_retries:
print(f'将在 3 秒后进行第 {retry_count + 1} 次重试...')
await asyncio.sleep(3)
if retry_count >= max_retries:
print(f'\n获取第 {page} 页数据失败,已达到最大重试次数')
break
return starred_repos
async def get_my_starred_repos(self) -> Set[str]:
"""获取自己已经 star 的仓库列表"""
await self.create_session()
starred_repos = set()
page = 1
while True:
await self.wait_for_rate_limit()
try:
url = 'https://api.github.com/user/starred'
async with self.session.get(url, params={'page': page, 'per_page': 100}) as response:
self.update_rate_limits(response.headers)
if response.status == 200:
repos = await response.json()
if not repos:
break
starred_repos.update(repo['full_name'] for repo in repos)
page += 1
await asyncio.sleep(0.5)
else:
break
except Exception as e:
print(f'\n获取个人star列表失败: {str(e)}')
break
return starred_repos
async def star_repo(self, repo_full_name: str) -> bool:
"""为单个仓库添加 star"""
await self.create_session()
await self.wait_for_rate_limit()
try:
url = f'https://api.github.com/user/starred/{repo_full_name}'
async with self.session.put(url) as response:
self.update_rate_limits(response.headers)
return response.status == 204
except Exception as e:
if self.progress_bar:
self.progress_bar.write(f'为 {repo_full_name} 添加 star 失败: {str(e)}')
return False
async def process_repos(self, target_repos: List[Dict], progress_file: str):
"""处理仓库列表,添加 star 并显示进度"""
# 加载进度记录
processed_repos = set()
if Path(progress_file).exists():
try:
with open(progress_file, 'r', encoding='utf-8') as f:
processed_repos = set(json.load(f))
except Exception:
pass
# 获取已 star 的仓库
my_starred_repos = await self.get_my_starred_repos()
# 准备待处理的仓库列表
repos_to_process = [
repo for repo in target_repos
if repo['full_name'] not in processed_repos
and repo['full_name'] not in my_starred_repos
]
if not repos_to_process:
print('没有需要处理的仓库')
return
# 初始化进度条
total = len(repos_to_process)
self.progress_bar = tqdm(total=total, desc='添加 star 进度')
success_count = 0
start_time = time.time()
for repo in repos_to_process:
repo_name = repo['full_name']
if await self.star_repo(repo_name):
success_count += 1
processed_repos.add(repo_name)
# 保存进度
with open(progress_file, 'w', encoding='utf-8') as f:
json.dump(list(processed_repos), f)
self.progress_bar.update(1)
# 计算并显示预计剩余时间
elapsed_time = time.time() - start_time
items_per_second = self.progress_bar.n / elapsed_time if elapsed_time > 0 else 0
remaining_items = total - self.progress_bar.n
eta_seconds = remaining_items / items_per_second if items_per_second > 0 else 0
self.progress_bar.set_postfix({'成功': success_count, 'ETA': f'{eta_seconds:.1f}s'})
# 添加适当的延迟以避免触发API限制
await asyncio.sleep(1)
# 关闭进度条
if self.progress_bar:
self.progress_bar.close()
# 显示最终统计信息
print(f'\n处理完成!成功添加 {success_count} 个star,失败 {total - success_count} 个。')
def load_starred_repos_from_file(file_path: str) -> List[Dict]:
"""从文件加载已保存的star列表"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
def save_starred_repos_to_file(repos: List[Dict], file_path: str) -> None:
"""保存star列表到文件"""
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(repos, f, ensure_ascii=False, indent=2)
async def main():
if len(sys.argv) != 4:
print('使用方法: python star_following_repos.py <target_username> <your_username> <your_token>')
print('示例: python star_following_repos.py ouyangzhiping hyeebeen your_token')
sys.exit(1)
target_username = sys.argv[1] # 要获取star列表的用户名(如ouyangzhiping)
your_username = sys.argv[2] # 你的GitHub用户名(如hyeebeen)
your_token = sys.argv[3] # 你的GitHub访问令牌
# 设置保存文件名
save_file = f'{target_username}_starred_repos.json'
progress_file = f'{target_username}_progress.json'
# 创建GitHub Star管理器实例
manager = GithubStarManager(your_token)
try:
# 尝试从文件加载已保存的star列表
target_starred_repos = load_starred_repos_from_file(save_file)
if target_starred_repos is None:
# 如果文件不存在或格式错误,重新获取star列表
print(f'正在获取用户 {target_username} 的已标星仓库...')
target_starred_repos = await manager.get_user_starred_repos(target_username)
if not target_starred_repos:
print('未找到已标星的仓库')
return
# 保存star列表到文件
save_starred_repos_to_file(target_starred_repos, save_file)
total_repos = len(target_starred_repos)
print(f'\n找到 {total_repos} 个已标星仓库。')
# 请求用户确认
confirm = input(f'\n是否确认为用户 {your_username} 添加这些star?(y/n): ')
if confirm.lower() != 'y':
print('操作已取消')
return
# 开始添加star
print('\n开始添加star...')
await manager.process_repos(target_starred_repos, progress_file)
finally:
# 确保关闭会话
await manager.close_session()
if __name__ == '__main__':
asyncio.run(main())