-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhttp_data_node.py
146 lines (121 loc) · 3.9 KB
/
http_data_node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from typing import Optional, List, Tuple
from urllib.request import urlopen, HTTPError, URLError
from urllib.parse import urlparse, urljoin
from util import *
class HttpDataNode:
"""
Python API for interaction with remote DataNode server through HTTP.
It implements same api as DataNode, but delegates actual execution
to remote data node server through HTTP.
"""
def __init__(self, url: str):
"""
Set data node server url.
Parameters
----------
url : str
URL to data node server.
"""
if not urlparse(url).netloc:
raise CommandError(f'Invalid node url {url}')
self._url = url
def mkfs(self):
urlopen(urljoin(self._url, '/mkfs')).close()
def df(self) -> Tuple[int, int, int]:
with urlopen(urljoin(self._url, '/df')) as resp:
return deserialize_tuple(
resp,
resp.length,
urlparse(resp.url).hostname,
)
def cd(self, path: str):
urlopen(
urljoin(self._url, '/cd'),
data=path.encode('utf-8'),
).close()
def ls(self, path: Optional[str] = None) -> List[str]:
with urlopen(
urljoin(self._url, '/ls'),
data=path.encode('utf-8') if path else b'',
) as resp:
return deserialize_list(
resp,
resp.length,
urlparse(resp.url).hostname,
)
def mkdir(self, path: str):
urlopen(
urljoin(self._url, '/mkdir'),
data=path.encode('utf-8'),
).close()
def rmdir(self, path: str, force: Optional[bool] = False):
data = path + ('!' if force else '')
urlopen(
urljoin(self._url, '/rmdir'),
data=data.encode('utf-8'),
).close()
def touch(self, path: str):
urlopen(
urljoin(self._url, '/touch'),
data=path.encode('utf-8'),
).close()
def cat(self, path: str) -> bytes:
with urlopen(
urljoin(self._url, '/cat'),
data=path.encode('utf-8'),
) as resp:
return resp.read()
def tee(self, path: str, data: bytes):
data = path.encode('utf-8') + b'\0' + data
urlopen(
urljoin(self._url, '/tee'),
data=data,
).close()
def rm(self, path: str):
urlopen(
urljoin(self._url, '/rm'),
data=path.encode('utf-8'),
).close()
def stat(self, path: str) -> Tuple[str, int, int]:
with urlopen(
urljoin(self._url, '/stat'),
data=path.encode('utf-8'),
) as resp:
return deserialize_stat(
resp,
resp.length,
urlparse(resp.url).hostname,
)
def cp(self, src: str, dst: str):
data = src + ' ' + dst
urlopen(
urljoin(self._url, '/cp'),
data=data.encode('utf-8'),
).close()
def mv(self, src: str, dst: str):
data = src + ' ' + dst
urlopen(
urljoin(self._url, '/mv'),
data=data.encode('utf-8'),
).close()
def sync(self, donor_url: str):
urlopen(
urljoin(self._url, '/sync'),
data=donor_url.encode('utf-8'),
).close()
def snap(self) -> bytes:
with urlopen(urljoin(self._url, '/snap')) as resp:
return resp.read()
def ping_alive(self) -> bool:
try:
urlopen(urljoin(self._url, '/ping_alive')).close()
return True
except (HTTPError, URLError):
return False
def join_namespace(self, namenode_url: str):
urlopen(
urljoin(self._url, '/join_namespace'),
data=namenode_url.encode('utf-8'),
).close()
def leave_namespace(self):
urlopen(urljoin(self._url, '/leave_namespace')).close()