Skip to content

Commit dca424f

Browse files
jens-kuertenJens KürtenCopilotalbertsj
authored
Feat: Upload service (#32)
* refactor service module * file upload service * fix service test * add tests * abort on error * add docstrings * more docstrings * improve docstrings * catch rate limit exceeded * add docs * Update docs/reference/service.md Co-authored-by: Copilot <[email protected]> * Update docs/reference/service.md Co-authored-by: Copilot <[email protected]> * update dev1 * Add Forbidden exception handling and update documentation for access checks * Update docs/reference/service.md Co-authored-by: Julian Alberts <[email protected]> * Update docs/reference/service.md Co-authored-by: Julian Alberts <[email protected]> --------- Co-authored-by: Jens Kürten <[email protected]> Co-authored-by: Copilot <[email protected]> Co-authored-by: Julian Alberts <[email protected]>
1 parent 6b7928b commit dca424f

File tree

11 files changed

+855
-60
lines changed

11 files changed

+855
-60
lines changed

csfunctions/handler.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,7 @@ def execute(function_name: str, request_body: str, function_dir: str = "src") ->
9696
link_objects(request.event)
9797

9898
function_callback = get_function_callable(function_name, function_dir)
99-
service = Service(
100-
str(request.metadata.service_url) if request.metadata.service_url else None, request.metadata.service_token
101-
)
99+
service = Service(metadata=request.metadata)
102100

103101
response = function_callback(request.metadata, request.event, service)
104102

csfunctions/service/__init__.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from csfunctions.metadata import MetaData
2+
from csfunctions.service.base import Conflict, NotFound, Unauthorized, UnprocessableEntity
3+
from csfunctions.service.file_upload import FileUploadService
4+
from csfunctions.service.numgen import NumberGeneratorService
5+
6+
__all__ = [
7+
"Service",
8+
"FileUploadService",
9+
"NumberGeneratorService",
10+
"Conflict",
11+
"NotFound",
12+
"Unauthorized",
13+
"UnprocessableEntity",
14+
]
15+
16+
17+
class Service:
18+
"""
19+
Provides access to services on the elements instance, e.g. generating numbers.
20+
"""
21+
22+
def __init__(self, metadata: MetaData):
23+
self.generator = NumberGeneratorService(metadata=metadata)
24+
self.file_upload = FileUploadService(metadata=metadata)

csfunctions/service/base.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
from typing import Optional
2+
3+
import requests
4+
5+
from csfunctions.metadata import MetaData
6+
7+
8+
class Unauthorized(Exception):
9+
pass
10+
11+
12+
class Forbidden(Exception):
13+
pass
14+
15+
16+
class Conflict(Exception):
17+
pass
18+
19+
20+
class NotFound(Exception):
21+
pass
22+
23+
24+
class UnprocessableEntity(Exception):
25+
pass
26+
27+
28+
class RateLimitExceeded(Exception):
29+
pass
30+
31+
32+
class BaseService:
33+
"""
34+
Base class for services.
35+
"""
36+
37+
def __init__(self, metadata: MetaData):
38+
# Store full metadata for services that need additional fields (e.g. app_user)
39+
self.metadata = metadata
40+
41+
def request(
42+
self, endpoint: str, method: str = "GET", params: Optional[dict] = None, json: Optional[dict] = None
43+
) -> dict | list:
44+
"""
45+
Make a request to the access service.
46+
"""
47+
if self.metadata.service_url is None:
48+
raise ValueError("No service url given.")
49+
if self.metadata.service_token is None:
50+
raise ValueError("No service token given.")
51+
52+
headers = {"Authorization": f"Bearer {self.metadata.service_token}"}
53+
params = params or {}
54+
url = str(self.metadata.service_url).rstrip("/") + "/" + endpoint.lstrip("/")
55+
response = requests.request(method, url=url, params=params, headers=headers, timeout=10, json=json)
56+
57+
if response.status_code == 401:
58+
raise Unauthorized
59+
if response.status_code == 403:
60+
raise Forbidden
61+
elif response.status_code == 409:
62+
raise Conflict
63+
elif response.status_code == 404:
64+
raise NotFound
65+
elif response.status_code == 422:
66+
raise UnprocessableEntity(response.text)
67+
elif response.status_code == 429:
68+
raise RateLimitExceeded(response.text)
69+
if response.status_code == 200:
70+
return response.json()
71+
else:
72+
raise ValueError(f"Access service responded with status code {response.status_code}.")

csfunctions/service/file_upload.py

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
import hashlib
2+
from copy import deepcopy
3+
from random import choice
4+
from string import ascii_letters
5+
from typing import BinaryIO
6+
7+
import requests
8+
9+
from csfunctions.service.base import BaseService
10+
from csfunctions.service.file_upload_schemas import (
11+
AbortFileUploadRequest,
12+
CompleteFileUploadRequest,
13+
CreateNewFileRequest,
14+
CreateNewFileResponse,
15+
GeneratePresignedUrlRequest,
16+
PresignedWriteUrls,
17+
)
18+
19+
20+
def _generate_lock_id():
21+
"""Generate a random 12-character lock ID."""
22+
return "".join(choice(ascii_letters) for i in range(12)) # nosec
23+
24+
25+
class FileUploadService(BaseService):
26+
def _create_new_file(self, filename: str, parent_object_id: str, persno: str, check_access: bool = True) -> str:
27+
"""Create a new empty file attached to the parent object."""
28+
response_json = self.request(
29+
endpoint="/file_upload/create",
30+
method="POST",
31+
json=CreateNewFileRequest(
32+
parent_object_id=parent_object_id, filename=filename, persno=persno, check_access=check_access
33+
).model_dump(),
34+
)
35+
data = CreateNewFileResponse.model_validate(response_json)
36+
return data.file_object_id
37+
38+
def _get_presigned_write_urls(
39+
self, file_object_id: str, filesize: int, lock_id: str, persno: str, check_access: bool = True
40+
) -> PresignedWriteUrls:
41+
"""Request presigned URLs for uploading file chunks."""
42+
response_json = self.request(
43+
endpoint=f"/file_upload/{file_object_id}/generate_presigned_url",
44+
method="POST",
45+
json=GeneratePresignedUrlRequest(
46+
check_access=check_access, persno=persno, filesize=filesize, lock_id=lock_id
47+
).model_dump(),
48+
)
49+
50+
return PresignedWriteUrls.model_validate(response_json)
51+
52+
def _upload_from_stream(
53+
self, presigned_urls: PresignedWriteUrls, stream: BinaryIO
54+
) -> tuple[PresignedWriteUrls, str]:
55+
"""Upload file stream in chunks and return updated presigned URLs and sha256 hash."""
56+
etags: list[str] = []
57+
sha256 = hashlib.sha256()
58+
for url in presigned_urls.urls:
59+
data: bytes = stream.read(presigned_urls.chunksize)
60+
sha256.update(data)
61+
resp = requests.put(url, data=data, headers=presigned_urls.headers, timeout=20)
62+
# 20 second timeout to stay below 30s max execution time of the Function
63+
# otherwise we won't get a proper error message in the logs
64+
resp.raise_for_status()
65+
etag = resp.headers.get("ETag")
66+
if etag:
67+
etags.append(etag)
68+
updated = deepcopy(presigned_urls)
69+
if etags:
70+
updated.etags = etags
71+
return updated, sha256.hexdigest()
72+
73+
@staticmethod
74+
def _get_stream_size(stream: BinaryIO) -> int:
75+
"""Get the size of a seekable stream."""
76+
if not stream.seekable():
77+
raise ValueError("Stream is not seekable; size cannot be determined.")
78+
current_pos = stream.tell()
79+
stream.seek(0, 2)
80+
size = stream.tell()
81+
stream.seek(current_pos)
82+
return size
83+
84+
def _complete_upload(
85+
self,
86+
file_object_id: str,
87+
filesize: int,
88+
lock_id: str,
89+
presigned_urls: PresignedWriteUrls,
90+
persno: str,
91+
check_access: bool = True,
92+
sha256: str | None = None,
93+
delete_derived_files: bool = True,
94+
) -> None:
95+
"""Mark the upload as complete and finalize the file."""
96+
self.request(
97+
endpoint=f"/file_upload/{file_object_id}/complete",
98+
method="POST",
99+
json=CompleteFileUploadRequest(
100+
filesize=filesize,
101+
check_access=check_access,
102+
persno=persno,
103+
presigned_write_urls=presigned_urls,
104+
lock_id=lock_id,
105+
sha256=sha256,
106+
delete_derived_files=delete_derived_files,
107+
).model_dump(),
108+
)
109+
110+
def _abort_upload(
111+
self, file_object_id: str, lock_id: str, persno: str, presigned_write_urls: PresignedWriteUrls
112+
) -> None:
113+
"""Abort an ongoing file upload."""
114+
self.request(
115+
endpoint=f"/file_upload/{file_object_id}/abort",
116+
method="POST",
117+
json=AbortFileUploadRequest(
118+
lock_id=lock_id,
119+
persno=persno,
120+
presigned_write_urls=presigned_write_urls,
121+
).model_dump(),
122+
)
123+
124+
def upload_file_content(
125+
self,
126+
file_object_id: str,
127+
stream: BinaryIO,
128+
persno: str | None = None,
129+
check_access: bool = True,
130+
filesize: int | None = None,
131+
delete_derived_files: bool = True,
132+
) -> None:
133+
"""
134+
Uploads content to an existing file object in chunks using presigned URLs.
135+
Handles aborting the upload if an error occurs.
136+
137+
Args:
138+
file_object_id: The ID of the file object to upload to.
139+
stream: A binary stream containing the file data.
140+
persno: The user/person number who is uploading the file (default is user that triggered the Function).
141+
check_access: Whether to check access permissions.
142+
filesize: Size of the file in bytes (required only if the stream is not seekable).
143+
delete_derived_files: Whether to delete derived files after upload.
144+
145+
Raises:
146+
csfunctions.service.Forbidden: If access check fails.
147+
csfunctions.service.Unauthorized: If the service token is invalid.
148+
csfunctions.service.Conflict: If the file is already locked.
149+
csfunctions.service.NotFound: If the file object does not exist.
150+
csfunctions.service.RateLimitExceeded: If the services rate limit is exceeded.
151+
"""
152+
persno = persno or self.metadata.app_user
153+
if filesize is None:
154+
filesize = self._get_stream_size(stream)
155+
lock_id = _generate_lock_id()
156+
presigned = self._get_presigned_write_urls(
157+
file_object_id=file_object_id,
158+
filesize=filesize,
159+
lock_id=lock_id,
160+
persno=persno,
161+
check_access=check_access,
162+
)
163+
try:
164+
presigned_with_etags, sha256 = self._upload_from_stream(presigned_urls=presigned, stream=stream)
165+
self._complete_upload(
166+
file_object_id=file_object_id,
167+
filesize=filesize,
168+
lock_id=lock_id,
169+
presigned_urls=presigned_with_etags,
170+
persno=persno,
171+
check_access=check_access,
172+
sha256=sha256,
173+
delete_derived_files=delete_derived_files,
174+
)
175+
except Exception as e:
176+
# if something goes wrong during upload we try to abort
177+
self._abort_upload(
178+
file_object_id=file_object_id,
179+
lock_id=lock_id,
180+
persno=persno,
181+
presigned_write_urls=presigned,
182+
)
183+
raise e
184+
185+
def upload_new_file(
186+
self,
187+
parent_object_id: str,
188+
filename: str,
189+
stream: BinaryIO,
190+
persno: str | None = None,
191+
check_access: bool = True,
192+
filesize: int | None = None,
193+
) -> str:
194+
"""
195+
Creates a new file attached to the parent object and uploads content from the provided stream.
196+
197+
Args:
198+
parent_object_id: The ID of the parent object to attach the file to.
199+
filename: The name of the new file.
200+
stream: A binary stream containing the file data.
201+
persno: The user/person number who is uploading the file (default is user that triggered the Function).
202+
check_access: Whether to check access permissions.
203+
filesize: Size of the file in bytes (required only if the stream is not seekable).
204+
205+
Returns:
206+
The ID of the newly created file object.
207+
208+
Raises:
209+
csfunctions.service.Forbidden: If access check fails.
210+
csfunctions.service.Unauthorized: If the service token is invalid.
211+
csfunctions.service.NotFound: If the parent object does not exist.
212+
csfunctions.service.RateLimitExceeded: If the services rate limit is exceeded.
213+
"""
214+
persno = persno or self.metadata.app_user
215+
file_object_id = self._create_new_file(
216+
filename=filename,
217+
parent_object_id=parent_object_id,
218+
persno=persno,
219+
check_access=check_access,
220+
)
221+
self.upload_file_content(
222+
file_object_id=file_object_id,
223+
stream=stream,
224+
persno=persno,
225+
check_access=check_access,
226+
filesize=filesize,
227+
delete_derived_files=False,
228+
)
229+
return file_object_id

0 commit comments

Comments
 (0)