-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
86 lines (73 loc) · 2.89 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from subprocess import Popen, PIPE, STDOUT
from typing import Union
from fastapi import FastAPI
from fastapi_utils.cbv import cbv
from fastapi_utils.inferring_router import InferringRouter
import json
from utils import Sopen, MinecraftServer
import os
app = FastAPI()
router = InferringRouter()
@app.get("/machine")
async def status():
return {"message": "online"}
@cbv(router)
class ServerManager:
server_session = None
server_name: Union[str, None] = None
with open("config.json", "r") as f:
config = json.load(f)
server_config = config['servers']
@router.get("/server/list")
# @staticmethod
async def get_list(self):
return list(ServerManager.server_config.keys())
@router.get("/server")
# @staticmethod
async def get_status(self):
if ServerManager.server_session is None:
return {"message": "offline"}
else:
return {"message": "online", "server_name": ServerManager.server_name}
# async def status(self):
# if ServerManager.server_session is None:
# return {"message": "offline"}
# else:
# return {"message": "online", "server_name": ServerManager.server_name}
@router.get("/server/start")
# @staticmethod
async def start(self, server: str):
if ServerManager.server_session is not None:
return {"message":
"""there's already running server""",
"server_name": ServerManager.server_name}
_dir = ServerManager.server_config[server]['destination']
os.chdir(_dir)
if 'minecraft' in _dir:
ServerManager.server_session = MinecraftServer(ServerManager.server_config[server]['cmd'])
else:
ServerManager.server_session = Sopen(ServerManager.server_config[server]['cmd'])
ServerManager.server_name = server
ServerManager.server_session.join_no_response()
x = ServerManager.server_session.communicate("/list")
if bool(x):
return {"message": "start server successfully", "exit_code": 0}
ServerManager.server_session.kill()
ServerManager.server_session = None
return {"message": "there is something wrong", "exit_code": 1}
@router.get("/server/stop")
# @staticmethod
async def stop(self):
if ServerManager.server_session is None:
return {"message": "server is offline", "exit_code": 0}
ServerManager.server_session.kill()
ServerManager.server_session = None
return {"messgae": "server have been stopped", "exit_code": 0}
# app.get('/server', ServerManager.get_status)
# app.get('/server/list', ServerManager.get_list)
# app.get('/server/start', ServerManager.start)
# app.get('/server/stop', ServerManager.stop)
app.include_router(router)
# server status
# server start
# server stop