|
| 1 | + |
| 2 | +''' |
| 3 | +定时模块schedule:可以按照当天指定时刻或一段时间后运行指定函数或方法 |
| 4 | +--- |
| 5 | +使用方法:\n |
| 6 | +在指定的一段时间后执行\n |
| 7 | +from pywechat.Clock import schedule\n |
| 8 | +schedule(funcs=[func1,func2],parameters=[{func1的参数字典},{func2的参数字典},waitPeriods=['20s','20min']]).execute() |
| 9 | +在指定的时刻执行\n |
| 10 | +from pywechat.clock import schedule\n |
| 11 | +schedule(funcs=[func1,func2],parameters=[{func1的参数字典},{func2的参数字典},Time=['08:31:14','08:45']]).execute() |
| 12 | +注:时刻可以精确到秒,若某个函数无需任何参数,那你在传入其对印的参数字典时,传入一个空字典即可\n |
| 13 | +若给定的时间戳与当前时间戳之差为负数,定时任务将会立即执行\n |
| 14 | +
|
| 15 | +定时模块schtasks:可以按照当天指定时间运行指定python代码 |
| 16 | +---- |
| 17 | +使用方法:\n |
| 18 | +在指定时刻执行\n |
| 19 | +from pywechat.Clock import scntasks\n |
| 20 | +scntasks.create_task(taskname='定时任务',start_time='08:31:14',pyfile_path='python文件地址')\n |
| 21 | +注:运行上述代码后,名为定时任务的schtask将会被添加到windows系统下的定时任务中,并在指定时刻运行传入的python文件内的代码\n |
| 22 | +若你不想传入一个python文件的路径,可以直接传入python代码的长字符串形式,例如: |
| 23 | +from pywechat.Clock import scntasks\n |
| 24 | +code='这是一段python代码' |
| 25 | +scntasks.create(taskname='定时任务',start_time='08:31:14',code=code) |
| 26 | +''' |
| 27 | +import os |
| 28 | +import sys |
| 29 | +import re |
| 30 | +import asyncio |
| 31 | +import subprocess |
| 32 | +from datetime import datetime |
| 33 | +from pywechat.Errors import TaskNotBuildError |
| 34 | +from pywechat.WinSettings import Systemsettings |
| 35 | +from pywechat.WechatTools import match_duration |
| 36 | +class schedule:#创建定时任务 |
| 37 | + ''' |
| 38 | + funcs:所有需要定时执行的函数名列表,注意函数名的类型为函数\n |
| 39 | + 不是类型为字符串的函数的名字!\n |
| 40 | + parameters:所有需要定时执行的函数的参数\n |
| 41 | + Times:各个函数定时执行的时间点,Times=['08:31','08:45:54']可精确到秒\n |
| 42 | + waitPeriod:各个函数在指定的一段时间执行的等待时长,waitPeriod=['20s','1h']分别在20s后和1h后执行两个函数、\n |
| 43 | + 比如:有两个函数分别为,test1(num:int,string:str),test2(num:int,string:str)\n |
| 44 | + 那么传入funcs和parameters时应为:funcs=[test1,test2]\n |
| 45 | + parameters=[{'num':2,'string':'test1'},{{'num':3,'string':'test2'}}]\n |
| 46 | + 这个类通过构建协程池来创建定时任务\n |
| 47 | + 注意:运行代码后请勿关闭代码编辑器,否则定时任务无法完成 |
| 48 | + -- |
| 49 | + ''' |
| 50 | + def __init__(self,funcs:list,parameters:list[dict],Times:list[str]=[],waitPeriods:list[str]=[]): |
| 51 | + self.Times=Times#指定时间点,'08:31','08:45:54'可精确到秒 |
| 52 | + self.waitPeriods=waitPeriods#指定时长,20s,1min,1h |
| 53 | + self.funcs=funcs#所有需要定时执行的函数名 |
| 54 | + self.parameters=parameters##所有需要定时执行的函数的参数[{},{}], |
| 55 | + |
| 56 | + def calculate_time_difference(self,target_time_string): |
| 57 | + colons=re.findall(r':',target_time_string) |
| 58 | + current_time = datetime.now() |
| 59 | + target_date = current_time.date() # 获取当前日期 |
| 60 | + if len(colons)==2: |
| 61 | + target_time_format="%H:%M:%S" |
| 62 | + elif len(colons)==1: |
| 63 | + target_time_format="%H:%M" |
| 64 | + else: |
| 65 | + raise ValueError('输入的时间戳有误!请重新输入!') |
| 66 | + target_time = datetime.combine(target_date, datetime.strptime(target_time_string, target_time_format).time()) |
| 67 | + # 计算时间差 |
| 68 | + time_difference = target_time - current_time |
| 69 | + hours_difference = time_difference.seconds // 3600 # 整除3600得到小时数 |
| 70 | + minutes_remainder = time_difference.seconds % 3600 # 求余得到剩余的秒数,再转换为分钟 |
| 71 | + minutes_difference = minutes_remainder // 60 # 整除60得到分钟数 |
| 72 | + seconds_difference = minutes_remainder % 60 # 再次求余得到秒数 |
| 73 | + print(f"时间差:{hours_difference}小时 {minutes_difference}分钟 {seconds_difference}秒") |
| 74 | + time_difference=time_difference.total_seconds() |
| 75 | + return time_difference |
| 76 | + |
| 77 | + async def async_task(self,func,parameter,Time:str=None,waitPeriod:str=None): |
| 78 | + if Time: |
| 79 | + print(f"函数{func.__name__}将会在{Time}时执行") |
| 80 | + await asyncio.sleep(self.calculate_time_difference(Time)) |
| 81 | + result=func(**parameter) |
| 82 | + return result |
| 83 | + if waitPeriod: |
| 84 | + print(f"函数{func.__name__}将会在{waitPeriod}后执行") |
| 85 | + waitPeriod=match_duration(waitPeriod) |
| 86 | + await asyncio.sleep(waitPeriod) |
| 87 | + result=func(**parameter) |
| 88 | + return result |
| 89 | + async def main(self): |
| 90 | + #构建协程池实现异步定时任务 |
| 91 | + self.tasks=[] |
| 92 | + if self.waitPeriods: |
| 93 | + for func,parameter,waitPeriod in zip(self.funcs,self.parameters,self.waitPeriods): |
| 94 | + self.tasks.append(self.async_task(func,parameter,waitPeriod=waitPeriod)) |
| 95 | + results=await asyncio.gather(*self.tasks) |
| 96 | + return results |
| 97 | + if self.Times: |
| 98 | + for func,parameter,time in zip(self.funcs,self.parameters,self.Times): |
| 99 | + self.tasks.append(self.async_task(func,parameter,Time=time)) |
| 100 | + results=await asyncio.gather(*self.tasks) |
| 101 | + return results |
| 102 | + def execute(self): |
| 103 | + #运行所有任务 |
| 104 | + Systemsettings.open_listening_mode() |
| 105 | + results=asyncio.run(self.main()) |
| 106 | + Systemsettings.close_listening_mode() |
| 107 | + return results |
| 108 | + |
| 109 | + |
| 110 | +class schtasks(): |
| 111 | + ''' |
| 112 | + 使用windows系统下的schtasks命令实现定时操作相较于schedule可以关闭代码编辑器 |
| 113 | + -- |
| 114 | + ''' |
| 115 | + @staticmethod |
| 116 | + def code_to_py(code:str): |
| 117 | + '''将字符串代码写入py文件到当前目录下 |
| 118 | + Args: |
| 119 | + code:字符串代码 |
| 120 | + ''' |
| 121 | + with open('code_to_be_executed.py','w',encoding='utf-8') as py: |
| 122 | + py.write(code) |
| 123 | + pyfile_path=os.path.abspath(os.path.join(os.getcwd(),'exec.py')) |
| 124 | + return pyfile_path |
| 125 | + @staticmethod |
| 126 | + def create_task(taskname:str,start_time:str,code:str=None,pyfile_path:str=None): |
| 127 | + ''' |
| 128 | + 创建一个Windows系统下的schtasks任务,该任务将在当天指定的start_time\n |
| 129 | + 执行传入的python代码或python脚本\n |
| 130 | + Args: |
| 131 | + taskname:\tschtasks命令名称 |
| 132 | + start_ime:\t执行任务的时间 |
| 133 | + code:\tpython代码类型为长字符串 |
| 134 | + pyfile_path:\tpython代码路径 |
| 135 | +
|
| 136 | + 注意:pyfile_path与code二者有其一即可,若二者都传入,优先使用pyfile_path的py代码 |
| 137 | + ---- |
| 138 | + ''' |
| 139 | + if not pyfile_path: |
| 140 | + pyfile_path=schtasks.code_to_py(code) |
| 141 | + # 创建一个schtasks命令来创建计划任务 |
| 142 | + command=f'{sys.executable} {pyfile_path}' |
| 143 | + schtasks_command = ( |
| 144 | + f'schtasks /create /tn {taskname} ' |
| 145 | + f'/tr "{command}" /sc ONCE /st {start_time} /f' |
| 146 | + ) |
| 147 | + subprocess.run(schtasks_command,text=True,shell=True) |
| 148 | + |
| 149 | + @staticmethod |
| 150 | + def change_task(taskname:str,start_time:str,code:str=None,pyfile_path:str=None): |
| 151 | + ''' |
| 152 | + 通过taskname修改一个已经设定的windows系统下的schtasks任务\n |
| 153 | + Args: |
| 154 | + taskname:\t已经设定的schtasks任务的名称 |
| 155 | + start_ime:\t修改后执行任务的时间 |
| 156 | + code:\t需要替换的python代码类型为长字符串 |
| 157 | + pyfile_path:\t需要替换的py文件路径 |
| 158 | + |
| 159 | + 注意:pyfile_path与code二者有其一即可,若二者都传入,优先使用pyfile_path的py代码 |
| 160 | + ---- |
| 161 | + ''' |
| 162 | + tasks=schtasks.get_all_created_tasks() |
| 163 | + if taskname in tasks.keys(): |
| 164 | + if code: |
| 165 | + if not pyfile_path: |
| 166 | + pyfile_path=schtasks.code_to_py(code) |
| 167 | + command=f'{sys.executable} {pyfile_path}' |
| 168 | + schtasks_command= ( |
| 169 | + f'schtasks /change /tn {taskname} ' |
| 170 | + f'/tr "{command}" /st {start_time} ' |
| 171 | + ) |
| 172 | + subprocess.run(schtasks_command,text=True,input='\n') |
| 173 | + else: |
| 174 | + schtasks_command=( |
| 175 | + f'schtasks /change /tn {taskname} /st {start_time} ' |
| 176 | + ) |
| 177 | + subprocess.run(schtasks_command,text=True,input='\n') |
| 178 | + else: |
| 179 | + raise TaskNotBuildError(f'你还未创建过名为{taskname}的定时任务!') |
| 180 | + |
| 181 | + @staticmethod |
| 182 | + def cancel_task(taskname): |
| 183 | + ''' |
| 184 | + 通过已经设定的schtasks的taskname取消该任务\n |
| 185 | +
|
| 186 | + Args: |
| 187 | + taskname:\t已经设定的schtasks任务的名称 |
| 188 | + |
| 189 | + ''' |
| 190 | + schtasks_command=(f'schtasks /delete /tn {taskname} /f') |
| 191 | + pyfile_path=os.path.abspath(os.path.join(os.getcwd(),'exec.py')) |
| 192 | + tasks=schtasks.get_all_created_tasks() |
| 193 | + if taskname in tasks.keys(): |
| 194 | + if os.path.exists(pyfile_path): |
| 195 | + os.remove(pyfile_path) |
| 196 | + subprocess.run(schtasks_command) |
| 197 | + else: |
| 198 | + raise TaskNotBuildError(f'你还未创建过名为{taskname}的定时任务!') |
| 199 | + @staticmethod |
| 200 | + def get_all_created_tasks(): |
| 201 | + ''' |
| 202 | + 获取所有已建立的schtasks任务名称与时间\n |
| 203 | + 返回值为任务名称与执行时间构成的字典\n |
| 204 | + ''' |
| 205 | + schtasks_command='schtasks /query /v /fo list' |
| 206 | + process=subprocess.run(schtasks_command,stdout=subprocess.PIPE,encoding='gbk') |
| 207 | + result=process.stdout |
| 208 | + tasknames=re.findall(r'任务名:(.*?)(\n|$)',result) |
| 209 | + tasknames=[name[0].strip() for name in tasknames] |
| 210 | + tasknames=[name.replace('\\','') for name in tasknames] |
| 211 | + start_times=re.findall(r'开始时间:(.*?)(\n|$)',result) |
| 212 | + start_times=[name[0].strip() for name in start_times] |
| 213 | + start_times=[name.replace('\\','') for name in start_times] |
| 214 | + tasks=dict(zip(tasknames,start_times)) |
| 215 | + return tasks |
| 216 | + |
| 217 | + |
0 commit comments