-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmembers.py
262 lines (222 loc) · 5.96 KB
/
members.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
"""
Thread-safe database for name node member management.
Attributes
----------
NEW : str
'new' - status of new nodes
ALIVE : str
'alive' - status of alive nodes
DEAD : str
'dead' - status of dead nodes
"""
import csv
import time
from typing import Dict, List
from threading import RLock
from pathlib import Path
from contextlib import contextmanager
def current_time() -> str:
"""
Return current time in format '%Y-%m-%d %H:%M:%S'.
Returns
-------
str:
Current time.
"""
return time.strftime('%Y-%m-%d %H:%M:%S')
NEW = 'new'
ALIVE = 'alive'
DEAD = 'dead'
class Member:
"""
Data node info. Should be created through MemberDB methods.
It allows transparent thread-safe database modification through
python properties that aquire database lock on write.
"""
def __init__(
self,
id: str,
url: str,
public_url: str,
status: str,
database,
):
self._id = id
self._url = url
self._public_url = public_url
self._status = status
self._db = database
def save(self):
record = {
'id': self._id,
'url': self._url,
'public_url': self._public_url,
'status': self._status,
}
self._db.update(record)
@property
def id(self):
return self._id
@id.setter
def id(self, v):
with self._db._lock:
self._id = v
self.save()
@property
def url(self):
return self._url
@url.setter
def url(self, v):
with self._db._lock:
self._url = v
self.save()
@property
def public_url(self):
return self._public_url
@public_url.setter
def public_url(self, v):
with self._db._lock:
self._public_url = v
self.save()
@property
def status(self):
return self._status
@status.setter
def status(self, v):
with self._db._lock:
self._status = v
self.save()
class MemberDB:
"""
Thread-safe database for data node information management.
It constructs and returns Member instances, allowing reading and
writing through them while keeping, records in sync in memory
and on disk.
Class Attrubutes
----------------
_DIALECT : str
Dialect used for csv formatting
_FIELDS : List[str]
List of fields used by csv.DictReader and csv.DictWriter
"""
_DIALECT = 'excel-tab'
_FIELDS = ['id', 'url', 'public_url', 'status']
def __init__(self, path: str):
"""
Open database.
Parameters
----------
path : str
Path to database file.
"""
path = Path(path)
path.touch(mode=0o600)
self._path = path
self._lock = RLock()
with self._lock, self._open_read() as records:
self._records = { r['id']: r for r in records }
def sync(self):
"""
Synchronize records to disk.
"""
with self._lock, self._open_write() as writer:
writer.writerows(self._records.values())
def get(self, id: str) -> Member:
"""
Get member instance from database.
Parameters
----------
id : str
Id of data node.
"""
with self._lock:
try:
record = self._records[id]
except KeyError:
return None
return Member(**record, database=self)
def update(self, record: Dict[str, str]):
"""
Update record with same id.
Parameters
----------
record : Dict
Record instance to rewrite.
"""
with self._lock:
self._records[record['id']].update(record)
self.sync()
def create(
self,
id: str,
url: str,
public_url: str = None,
status: str = NEW,
) -> Member:
"""
Create new data node record.
Parameters
----------
id : str
Id of data node.
url : str
URL to access data node.
public_url : str
URL to advertise to users of clusters.
status : str
Status of data node.
Returns
-------
Member:
New member instance.
"""
with self._lock:
record = {
'id': id,
'url': url,
'public_url': public_url,
'status': status,
}
self._records[id] = record
self.sync()
return Member(**record, database=self)
def filter(self, **kwargs) -> List[Member]:
"""
Find records that match the filters in kwargs.
Returns records that exactly match value of each of the filters.
Parameters
----------
kwargs : dict
Attributes to filter records by
Returns
-------
List[Member]:
List of member instances matching filters. Or all members
if filters are not given.
"""
with self._lock:
records = [
r for r in self._records.values()
]
for (key, value) in kwargs.items():
records = [
r for r in records
if r[key] == value
]
return [Member(**r, database=self) for r in records]
@contextmanager
def _open_read(self):
with open(self._path, newline='') as fs:
yield csv.DictReader(
fs,
dialect=self._DIALECT,
fieldnames=self._FIELDS,
)
@contextmanager
def _open_write(self, append=False):
with open(self._path, 'a' if append else 'w', newline='') as fs:
yield csv.DictWriter(
fs,
dialect=self._DIALECT,
fieldnames=self._FIELDS,
)