Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
yudeqang committed Apr 15, 2022
0 parents commit 6e9caad
Show file tree
Hide file tree
Showing 26 changed files with 988 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.idea/
__pycache__/
65 changes: 65 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
## 简介
plana是一个非常简单的定时任务框架,内置了异常提醒和任务记录功能 \
现阶段使用需要配置一个MongoDB,因为还没有前端的展示功能,待后续开发,两天前萌生了一个想法,想要做一个定时任务框架,现在我的定时任务都基于linux的Crontab来管理,
许多小脚本存放于服务器的各个地方,想使用一种统一的方式来管理他们。开源的方案也有,看了一些任务调度框架,但是它们部署都挺麻烦,而且不一定适用,于是就有了这个项目,它是基于APScheduler开发的。

## 快速上手
```python
from plana.core import add_job, IntervalTrigger, start
from plana.core.notice import EmailNotice

# 使用EMAIL提醒需要提供settings.py配置文件或者实例化的时候传入参数
notice = EmailNotice()

@add_job(IntervalTrigger(seconds=3), name='你好啊', notice=notice)
def say_hello():
"""Interval 示例"""
print("hello world")

start()
```
plana中有几个基本的对象
trigger 定时器,有两种选择,间隔时间,或者linux cron 格式的定时器,示例中使用的是间隔时间,每隔3秒运行一次
notice 提醒器,内置了两种,钉钉和邮件提醒
使用plana添加定时任务分为如下几步
1. 引入相关的对象
2. 使用add_job注册一个定时任务,需要提供trigger,name和notice是可选的,当你的任务需要异常提醒的时候要提供notice,建议提供name,这样有利于阅读
3. 编写定时函数内容
4. 运行start方法

## 触发异常提醒
要触发提醒,需要抛出一个NoticeException
```python
from plana.core.Exceptions import NoticeException

def test():
raise NoticeException('定时任务异常啦')
```

![](./static/emailnotice.jpg)

## 配置文件
由于定时任务存储目前依赖于MongoDB,所有需要用户提供一个mongodb的连接
可选的配置:
```python
# Mongo连接, mongodb://root:[email protected]:27017
MONGO_URI = ''

# 如果定时任务存在某个模块下,提供模块的名字
WORK_MODULE = 'work'

# 钉钉的Token
DING_TOKEN = ''

# 邮件设置
EMAIL_USER = '' # 账户
EMAIL_PWD = '' # IMAP密码
EMAIL_HOST = ''
EMAIL_TO_USER = ''
```

## todo
目前这个项目处于非常早期的版本,不足之处还有很多,后续要陆续优化 \
后续还打算提供一个简单的前端页面来展示定时任务运行记录 \
除了用MongoDB做任务存储以外,还要支持其他的数据库 \
为了再降低使用成本,还要添加基于内存的存储
13 changes: 13 additions & 0 deletions example/cron.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""
@author: yudeqiang
@file: cron.py
@time: 2022/04/13
@describe:
"""
from plana.core import add_job, CronTrigger


@add_job(CronTrigger(second=3))
def say_hello_cron():
"""cron 示例"""
print("hello world cron")
13 changes: 13 additions & 0 deletions example/interval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""
@author: yudeqiang
@file: interval.py
@time: 2022/04/13
@describe:
"""
from plana.core import add_job, IntervalTrigger


@add_job(IntervalTrigger(seconds=3))
def say_hello():
"""Interval 示例"""
print("hello world")
44 changes: 44 additions & 0 deletions plana/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""
@author: yudeqiang
@file: __init__.py.py
@time: 2022/04/15
@describe:
"""
import importlib
import importlib.util
from pathlib import Path
import os

from .core import *
from .settings import WORK_MODULE

MODULE_EXTENSIONS = '.py'


def package_contents(package_name):
"""查找模块下有哪些py文件"""
spec = importlib.util.find_spec(package_name)
if spec is None:
return set()

pathname = Path(spec.origin).parent
ret = set()
with os.scandir(pathname) as entries:
for entry in entries:
if entry.name.startswith('__'):
continue
current = '.'.join((package_name, entry.name.partition('.')[0]))
if entry.is_file():
if entry.name.endswith(MODULE_EXTENSIONS):
ret.add(current)
elif entry.is_dir():
ret.add(current)
ret |= package_contents(current)

return ret


content = package_contents(WORK_MODULE)
for c in content:
importlib.import_module(c)

14 changes: 14 additions & 0 deletions plana/core/Exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""
@author: yudeqiang
@file: Exceptions.py
@time: 2022/04/14
@describe:
"""


class NotTokenException(Exception):
pass


class NoticeException(Exception):
pass
1 change: 1 addition & 0 deletions plana/core/VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0.0.1
104 changes: 104 additions & 0 deletions plana/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""
@author: yudeqiang
@file: __init__.py
@time: 2022/04/13
@describe:
"""
import functools
from typing import Optional

from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger

from .log import log
from .Exceptions import NoticeException
from .schedulers import MySchedulerBase, SxBlockingScheduler, SxBackgroundScheduler
from plana.settings import SCHEDULER_CLS, JOB_BACKEND_DB, TASK_BACKEND_DB
from .task import Task
from .backend import MongoJobBackend, MongoTaskBackend
from .notice.base import Notice

if SCHEDULER_CLS == 'block':
SCHEDULER_CLS = SxBlockingScheduler
elif SCHEDULER_CLS == 'background':
SCHEDULER_CLS = SxBackgroundScheduler
else:
print('未知的调度器')
exit()

if JOB_BACKEND_DB == 'mongo':
JOB_BACKEND_DB = MongoJobBackend
else:
print('未知的Job存储')
exit()

if TASK_BACKEND_DB == 'mongo':
TASK_BACKEND_DB = MongoTaskBackend
else:
print('未知的任务存储')
exit()


def schedulerFactory() -> MySchedulerBase:
backend = JOB_BACKEND_DB()
scheduler = SCHEDULER_CLS(backend, {
# 'apscheduler.jobstores.default': {
# 'type': 'redis',
# 'host': '192.168.0.54',
# 'password': '123456',
# 'db': 1
# },
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'Asia/Shanghai',
})
return scheduler


scheduler = schedulerFactory()
task_backend_db = TASK_BACKEND_DB()


def add_job(trigger, name=None, notice: Optional[Notice] = None):
"""
:param name: 为定时任务取一个名字,默认取函数的名字
:param trigger: apscheduler.triggers.base.BaseTrigger trigger
:param notice: 异常提醒 一个Notice实例
:return:
"""

def warp(func):
nonlocal name
if name is None:
name = func.__name__

@functools.wraps(func)
@scheduler.scheduled_job(trigger, name=name)
def inner(*args, **kwargs):
job = scheduler.find_job(name)
task = Task(job, task_backend_db)
task.start()
log.debug(f'{job.name}->开始执行')
try:
func(*args, **kwargs)
except NoticeException as e:
notice.notice(str(e))
task.set_exception(e)
except Exception as e:
task.set_exception(e)
log.error(f'{name}->执行异常:{str(e)}')
finally:
task.stop()
log.debug(f'{name}->执行结束')

return inner

return warp


def start():
scheduler.start()
72 changes: 72 additions & 0 deletions plana/core/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
@author: yudeqiang
@file: backend.py
@time: 2022/04/15
@describe:
"""
import datetime
from abc import ABC, abstractmethod

import pytz
from apscheduler.job import Job
from pymongo import MongoClient

from plana.settings import MONGO_URI


assert MONGO_URI


class JobBackendDb(ABC):

@abstractmethod
def register(self, job: Job):
"""注册一个Job到数据库中"""

@abstractmethod
def get_all_job(self) -> list:
"""获取所有的Job"""


class MongoJobBackend(JobBackendDb):

def __init__(self):
cli = MongoClient(MONGO_URI)
db = cli['scheduler']
self.col = db['job']

def register(self, job: Job):
if self.col.find_one({'job_id': job.id}):
return
self.col.insert_one({'job_id': job.id, 'job_name': job.name,
'registry_time': datetime.datetime.now(pytz.timezone('Asia/Shanghai'))})

def get_all_job(self) -> list:
return list(self.col.find({}, {'_id': 0}))


class TaskBackendDb(ABC):
"""Task的后端存储"""

@abstractmethod
def insert_task(self, task):
pass

@abstractmethod
def update_task(self, condition, val):
pass


class MongoTaskBackend(TaskBackendDb):

def __init__(self):
cli = MongoClient(MONGO_URI)
db = cli['scheduler']
self.col = db['task']

def insert_task(self, data):
self.col.insert_one(data)

def update_task(self, condition, val):
self.col.update_one(condition, val)

Empty file added plana/core/core.py
Empty file.
Loading

0 comments on commit 6e9caad

Please sign in to comment.