forked from SunoApi/SunoApi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsuno.py
139 lines (119 loc) · 4.08 KB
/
suno.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# -*- coding:utf-8 -*-
import json,os,io
from fastapi import FastAPI, HTTPException, Request, status, Header
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import FileResponse,Response
from fastapi.responses import StreamingResponse
from typing import Any, List, Optional, Union
from pydantic import BaseModel, Field
from starlette.requests import Request
from urllib.parse import urlparse
import uvicorn
import aiohttp
import os,sys,re
import mimetypes
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class jsonResponse(BaseModel):
code: Optional[int] = 0
msg: Optional[str] = "success"
data: Optional[Any] = None
@app.get("/")
async def root():
return jsonResponse()
@app.get("/files/{file_name}")
async def files(request: Request, file_name: str):
# print(request.url._url)
result = urlparse(request.url._url)
file_url = "https://cdn1.suno.ai" + result.path.replace("/files", "")
file_path = result.path.replace("/files", "files")
media_type = "text/plain"
if file_path.endswith(".png"):
media_type = "image/png"
elif file_path.endswith(".mp3"):
media_type = "audio/mpeg"
elif file_path.endswith(".mp4"):
media_type = "video/mp4"
else:
media_type = "application/octet-stream"
if os.path.exists(file_path):
return write_file(request, file_path, media_type)
else:
result = await download(file_url, file_path)
try:
if result == "200":
return write_file(request, file_path, media_type)
else:
return result
except Exception as e:
return {"detail":str(e)}
async def download(url, filename):
conn = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=conn) as session:
try:
async with session.get(url) as response:
if response.status == 200:
with open(filename, "wb") as f:
f.write(await response.read())
return str(response.status)
else:
return {"detail":str(response.status)}
except Exception as e:
return {"detail":str(e)}
def write_file(request, file_path, media_type):
# if file_path.endswith(".png"):
# return FileResponse(file_path)
# # return StreamingResponse(open(file_path, 'rb'), media_type=media_type)
# else:
# start, end = get_range(request)
# return partial_response(media_type, file_path, start, end)
if request.query_params.get("play") == "true":
start, end = get_range(request)
return partial_response(media_type, file_path, start, end)
else:
return FileResponse(file_path)
# return StreamingResponse(open(file_path, 'rb'), media_type=media_type)
def get_buff_size(file_size):
buff_size = 2097152
if file_size > 1073741824:
buff_size = 4194304
return buff_size
def get_range(request):
range = request.headers.get("Range")
m = None
if range:
m = re.match(r"bytes=(?P<start>\d+)-(?P<end>\d+)?", range)
if m:
start = m.group("start")
end = m.group("end")
start = int(start)
if end is not None:
end = int(end)
return start, end
else:
return 0, None
def partial_response(media_type, path, start, end=None):
if not os.path.exists(path):
return {"detail":"file not fount!"}
file_size = os.path.getsize(path)
buff_size = get_buff_size(file_size)
if end is None:
end = start + buff_size - 1
end = min(end, file_size - 1)
end = min(end, start + buff_size - 1)
length = end - start + 1
with open(path, 'rb') as fd:
fd.seek(start)
bytes = fd.read(length)
assert len(bytes) == length
headers = {
"Accept-Ranges": "bytes",
"Content-Range": "bytes {0}-{1}/{2}".format(start, end, file_size,)
}
return Response(bytes, 206, headers, media_type, None)