-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmdfy.py
98 lines (80 loc) · 2.98 KB
/
mdfy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from fastapi import Depends, FastAPI, HTTPException, status, UploadFile, File
from fastapi.security import OAuth2PasswordBearer
from fastapi.responses import PlainTextResponse
from fastapi_cache import FastAPICache
from fastapi_cache.decorator import cache
from fastapi_cache.backends.redis import RedisBackend
from starlette.responses import Response
import requests
from converters.markdown_splitter import MarkdownSplitter
from utils.error_handling import handle_error
from utils.environment import load_environment, get_env_variable
from returns.pipeline import is_successful
load_environment()
ACCESS_TOKEN = get_env_variable("ACCESS_TOKEN")
REDIS_URL: str = get_env_variable("REDIS_URL")
CACHE_EXPIRATION_IN_SECONDS = int(
get_env_variable("CACHE_EXPIRATION_IN_SECONDS", "3600")
)
app = FastAPI(
title="mdfy",
description="A service to convert various file formats to Markdown",
version="0.1.0",
)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@app.on_event("startup")
async def startup():
redis = RedisBackend(REDIS_URL)
FastAPICache.init(redis, prefix="fastapi-cache")
@app.get("/", response_class=PlainTextResponse)
async def root():
return "Welcome to mdfy - a service to convert various file formats to Markdown"
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.post("/process_url/")
@cache(expire=CACHE_EXPIRATION_IN_SECONDS)
async def process_url(
url: str, token: str = Depends(oauth2_scheme), response: Response = None
):
if token != ACCESS_TOKEN:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid access token"
)
try:
response = requests.get(url)
response.raise_for_status()
splitter = MarkdownSplitter()
result = splitter.call(response.content)
if is_successful(result):
response.headers["X-Cached"] = "true"
return {"sections": result.unwrap()}
else:
raise HTTPException(status_code=500, detail="Failed to process file")
except requests.RequestException as e:
raise HTTPException(
status_code=422, detail=f"Could not process the file from URL: {str(e)}"
)
except Exception as e:
return handle_error(e)
@app.post("/process_upload/")
async def process_upload(
file: UploadFile = File(...), token: str = Depends(oauth2_scheme)
):
if token != ACCESS_TOKEN:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid access token"
)
try:
file_content = await file.read()
splitter = MarkdownSplitter()
result = splitter.call(file_content)
if is_successful(result):
return {"sections": result.unwrap()}
else:
raise HTTPException(status_code=500, detail="Failed to process file")
except Exception as e:
return handle_error(e)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)