-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcanvas_files.py
94 lines (68 loc) · 2.22 KB
/
canvas_files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import errno
import os
import requests
from cachecontrol import CacheControl
from dotenv import load_dotenv
from .utilities import Context, Item
load_dotenv()
CANVAS_URL = os.getenv('CANVAS_URL')
ACCESS_TOKEN = os.getenv('ACCESS_TOKEN')
API_URL = f'{CANVAS_URL}/api/v1'
class CanvasFiles():
def __init__(self, context_id, context=Context.COURSE):
api = requests.Session()
api.headers['Authorization'] = f'Bearer {ACCESS_TOKEN}'
self.api = CacheControl(api)
self.context = context
self.context_id = context_id
def get_folder(self, folder_id):
url = f'{API_URL}/{self.context}/{self.context_id}/folders/{folder_id}'
response = self.api.get(url)
if response.status_code == requests.codes.unauthorized:
raise ConnectionError()
if response.status_code == requests.codes.not_found:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT))
return response.json()
def get_file(self, file_id):
url = f'{API_URL}/{self.context}/{self.context_id}/files/{file_id}'
response = self.api.get(url)
if response.status_code == requests.codes.unauthorized:
raise ConnectionError()
if response.status_code == requests.codes.not_found:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT))
return response.json()
def get_item(self, item_id):
try:
return self.get_folder(item_id)
except FileNotFoundError:
try:
return self.get_file(item_id)
except FileNotFoundError:
return None
@staticmethod
def item_type(item):
if 'name' in item:
return Item.FOLDER
if 'display_name' in item:
return Item.FILE
return None
def _ls_files(self, folder_id):
resolved_path = self.get_folder(folder_id)
files = self.api.get(resolved_path['files_url'])
return files.json()
def _ls_folders(self, folder_id):
resolved_path = self.get_folder(folder_id)
folders = self.api.get(resolved_path['folders_url'])
return folders.json()
def ls(self, folder_id):
files = self._ls_files(folder_id)
folders = self._ls_folders(folder_id)
return files + folders
async def download_file(self, file_url, offset, size):
start, end = offset, offset + size - 1
data = self.api.get(
file_url,
headers={'range': f'bytes={start}-{end}'},
stream=True
)
return data.raw.read()