This repository has been archived by the owner on Feb 16, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfileuploader.py
55 lines (50 loc) · 2.29 KB
/
fileuploader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import tls_client, random, string
from io import BytesIO
from os.path import basename
__all__ = ['File', 'FileUploader']
def randSeq(length: int, chars: str, encode: str = 'ascii'):
return bytes().join(random.choice(chars).encode(encode) for _ in range(length))
class File:
def __init__(self, file: str|BytesIO, *, content_type: str = None, name: str = None):
self.name = name
self.content_type = content_type
if type(file) == str:
self.file = open(file, 'rb')
if name == None or len(name) == 0:
self.name = basename(self.file.name)
elif isinstance(file, BytesIO):
self.file = file
self.name = name
else:
raise TypeError("'file' object must be string or BytesIO")
def extract(self):
return self.name, self.file, self.content_type
class FileUploader:
def __init__(self, sess: tls_client.Session):
self.new_session(sess)
def new_session(self, sess: tls_client.Session):
self.sess = sess
self.reset()
def reset(self):
self.__new_boundary()
self.body = b''
def __new_boundary(self):
self.boundary = b"----WebKitFormBoundary" + randSeq(16, string.ascii_lowercase + string.ascii_uppercase + string.digits)
def __generate_file_header(self, name: str, filename: str = None, content_type: str = None):
header = b'Content-Disposition: form-data; name="' + name.encode() + b'"; '
if filename != None:
header += b'filename="' + filename.encode() + b'"; '
if content_type != None:
header += b'\r\nContent-Type: ' + content_type.encode()
return header
def addFile(self, name: str, file: File):
filename, fp, mime = file.extract()
self.body = b'--' + self.boundary + b'\r\n'
self.body += self.__generate_file_header(name, filename, mime)
self.body += b'\r\n\r\n'
self.body += fp.read()
self.body += b'\r\n--' + self.boundary + b'--\r\n'
def upload(self, url: str, *, files = None, data = None, json = None, **kwargs):
headers = kwargs.pop('headers', dict())
headers['Content-Type'] = 'multipart/form-data; boundary=' + self.boundary.decode()
return self.sess.post(url, data=self.body, headers=headers, **kwargs)