Skip to content

Commit e3dfab9

Browse files
committed
back to grequests
1 parent 9db1c04 commit e3dfab9

File tree

2 files changed

+161
-118
lines changed

2 files changed

+161
-118
lines changed

orchestrator/client_handler.py

+157-114
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
from multiprocessing import Process, Manager, Lock
1+
#from multiprocessing import Process, Manager, Lock
2+
import grequests
23
import requests
34
from time import time, sleep
45
import logging
56

6-
77
class ClientHandler:
88
"""Performs concurrent requests with timeout
99
@@ -14,50 +14,22 @@ class ClientHandler:
1414
def __init__(self, clients, OPERATION_MODE='wait_all', **kwargs):
1515
self.clients = self.parse_clients(clients)
1616
self.n_clients = len(clients)
17-
self.OPERATION_MODE = OPERATION_MODE
18-
logging.info(
19-
'[Client Handler] Operation mode: {}'.format(self.OPERATION_MODE))
20-
default_n_firsts = max(1, self.n_clients - 2)
21-
if self.OPERATION_MODE == 'n_firsts':
22-
self.N_FIRSTS = kwargs.get('n_firsts', default_n_firsts)
23-
assert self.N_FIRSTS <= self.n_clients, \
24-
'n_firsts must be <= than num clients'
25-
logging.info(
26-
'[Client Handler] n_firsts: {}'.format(self.N_FIRSTS))
27-
elif self.OPERATION_MODE == 'timeout':
28-
self.WAIT_FROM_N_FIRSTS = kwargs.get('wait_from_n_firsts',
29-
default_n_firsts)
30-
self.TIMEOUT = kwargs.get('timoeut', 60) # Seconds
31-
elif self.OPERATION_MODE == 'wait_all':
32-
self.N_FIRSTS = self.n_clients
33-
logging.info('[Client Handler] Will wait '
34-
'until {} clients'.format(self.N_FIRSTS))
35-
else:
17+
if not OPERATION_MODE == 'wait_all':
3618
raise Exception('Operation mode not accepted')
37-
self.manager = Manager()
38-
self.lock = Lock()
39-
self.last_operation = self.manager.dict()
40-
self.reset_last_operation()
41-
self.processes = []
19+
logging.info(
20+
'[Client Handler] Operation mode: {}'.format(OPERATION_MODE))
4221

4322
def perform_requests_and_wait(self, endpoint):
44-
self.perform_parallel_requests(endpoint)
45-
if self.OPERATION_MODE == 'n_firsts':
46-
if endpoint == 'send_model':
47-
# TODO: Do this part with redundancy
48-
return self.wait_until('n_responses', wait_all=True)
49-
return self.wait_until('n_responses')
50-
elif self.OPERATION_MODE == 'timeout':
51-
self.started = time()
52-
return self.wait_until('timeout')
53-
elif self.OPERATION_MODE == 'wait_all':
54-
return self.wait_until('n_responses', wait_all=True)
55-
56-
def reset_last_operation(self):
57-
self.lock.acquire()
58-
for key in self.clients:
59-
self.last_operation[key] = None
60-
self.lock.release()
23+
client_urls = self.get_client_urls(endpoint)
24+
rs = (grequests.get(u) for u in client_urls)
25+
responses = grequests.map(rs)
26+
for res in responses:
27+
if not res or res.status_code != 200:
28+
raise Exception('The response was not successful. '
29+
'Code: {}, Msg: {}'.format(res.status_code,
30+
res.text))
31+
l = len(responses)
32+
logging.info('[Client Handler] Got {} responses'.format(l))
6133

6234
@staticmethod
6335
def parse_clients(clients):
@@ -68,76 +40,147 @@ def parse_clients(clients):
6840
p_clients.append((host, port))
6941
return p_clients
7042

71-
def perform_parallel_requests(self, endpoint):
72-
self.reset_last_operation()
43+
def get_client_urls(self, endpoint):
44+
urls = []
7345
for host, port in self.clients:
74-
p = Process(target=self.perform_request,
75-
args=(host, port, endpoint))
76-
p.start()
77-
self.processes.append(p)
78-
79-
def wait_until(self, until_cond, wait_all=False):
80-
# TODO: What to do in send model?
81-
ended_clients = set()
82-
completed = False
83-
while not completed:
84-
# Periodically check if the requests are ending
85-
for key in self.clients:
86-
last_operation = self.last_operation[key]
87-
if not last_operation or key in ended_clients:
88-
# Last operation still not computed or already finished
89-
continue
90-
elif last_operation['ended']:
91-
# TODO: Handle exception when status code != 200
92-
if last_operation['response'].status_code != 200:
93-
raise Exception(
94-
'Error in response. Error code: {}'.format(
95-
last_operation['response'].text))
96-
logging.info(
97-
'[Client Handler] client {} '
98-
'finished performing operation {}'.format(
99-
key, last_operation['op']
100-
)
101-
)
102-
ended_clients.add(key)
103-
if until_cond == 'n_responses':
104-
if ((not wait_all and (len(ended_clients) >= self.N_FIRSTS))
105-
or (wait_all and len(ended_clients) == self.N_FIRSTS)):
106-
completed = True
107-
elif until_cond == 'timeout':
108-
elapsed = time() - self.started
109-
if ((len(ended_clients) >= self.WAIT_FROM_N_FIRSTS) and
110-
elapsed > self.TIMEOUT):
111-
completed = True
112-
else:
113-
raise Exception('Not implemented')
114-
sleep(0.2)
115-
self.terminate_processes()
116-
return list(ended_clients)
117-
118-
def terminate_processes(self):
119-
for p in self.processes:
120-
p.terminate()
121-
self.processes = []
122-
123-
@staticmethod
124-
def get_client_key(host, port):
125-
return (host, port)
126-
127-
def perform_request(self, host, port, endpoint):
128-
client_key = self.get_client_key(host, port)
129-
last_operation = {
130-
'started': time(),
131-
'op': endpoint,
132-
'status': 'started',
133-
'ended': None
134-
}
135-
url = 'http://{}:{}/{}'.format(host, port, endpoint)
136-
res = requests.get(url)
137-
last_operation.update({'status': 'ended',
138-
'ended': time(),
139-
'response': res})
140-
self.lock.acquire()
141-
self.last_operation[client_key] = last_operation
142-
self.lock.release()
46+
urls.append('http://{}:{}/{}'.format(host, port, endpoint))
47+
return urls
14348

49+
# This was about to work but parellility is difficult and in practics is not working
50+
#class ClientHandler:
51+
# """Performs concurrent requests with timeout
52+
#
53+
# :OPERATION_MODE n_firsts, timeout or wait_all.
54+
# :clients list of clients' (host, port) tuples
55+
# """
56+
#
57+
# def __init__(self, clients, OPERATION_MODE='wait_all', **kwargs):
58+
# self.clients = self.parse_clients(clients)
59+
# self.n_clients = len(clients)
60+
# self.OPERATION_MODE = OPERATION_MODE
61+
# logging.info(
62+
# '[Client Handler] Operation mode: {}'.format(self.OPERATION_MODE))
63+
# default_n_firsts = max(1, self.n_clients - 2)
64+
# if self.OPERATION_MODE == 'n_firsts':
65+
# self.N_FIRSTS = kwargs.get('n_firsts', default_n_firsts)
66+
# assert self.N_FIRSTS <= self.n_clients, \
67+
# 'n_firsts must be <= than num clients'
68+
# logging.info(
69+
# '[Client Handler] n_firsts: {}'.format(self.N_FIRSTS))
70+
# elif self.OPERATION_MODE == 'timeout':
71+
# self.WAIT_FROM_N_FIRSTS = kwargs.get('wait_from_n_firsts',
72+
# default_n_firsts)
73+
# self.TIMEOUT = kwargs.get('timoeut', 60) # Seconds
74+
# elif self.OPERATION_MODE == 'wait_all':
75+
# self.N_FIRSTS = self.n_clients
76+
# logging.info('[Client Handler] Will wait '
77+
# 'until {} clients'.format(self.N_FIRSTS))
78+
# else:
79+
# raise Exception('Operation mode not accepted')
80+
# self.manager = Manager()
81+
# self.lock = Lock()
82+
# self.last_operation = self.manager.dict()
83+
# self.reset_last_operation()
84+
# self.processes = []
85+
#
86+
# def perform_requests_and_wait(self, endpoint):
87+
# self.perform_parallel_requests(endpoint)
88+
# if self.OPERATION_MODE == 'n_firsts':
89+
# if endpoint == 'send_model':
90+
# # TODO: Do this part with redundancy
91+
# return self.wait_until('n_responses', wait_all=True)
92+
# return self.wait_until('n_responses')
93+
# elif self.OPERATION_MODE == 'timeout':
94+
# self.started = time()
95+
# return self.wait_until('timeout')
96+
# elif self.OPERATION_MODE == 'wait_all':
97+
# return self.wait_until('n_responses', wait_all=True)
98+
#
99+
# def reset_last_operation(self):
100+
# self.lock.acquire()
101+
# for key in self.clients:
102+
# self.last_operation[key] = None
103+
# self.lock.release()
104+
#
105+
# @staticmethod
106+
# def parse_clients(clients):
107+
# p_clients = []
108+
# for cl in clients:
109+
# host = cl[list(cl.keys())[0]]['host']
110+
# port = cl[list(cl.keys())[0]]['port']
111+
# p_clients.append((host, port))
112+
# return p_clients
113+
#
114+
# def perform_parallel_requests(self, endpoint):
115+
# self.reset_last_operation()
116+
# for host, port in self.clients:
117+
# p = Process(target=self.perform_request,
118+
# args=(host, port, endpoint))
119+
# p.start()
120+
# self.processes.append(p)
121+
#
122+
# def wait_until(self, until_cond, wait_all=False):
123+
# # TODO: What to do in send model?
124+
# ended_clients = set()
125+
# completed = False
126+
# while not completed:
127+
# # Periodically check if the requests are ending
128+
# for key in self.clients:
129+
# last_operation = self.last_operation[key]
130+
# if not last_operation or key in ended_clients:
131+
# # Last operation still not computed or already finished
132+
# continue
133+
# elif last_operation['ended']:
134+
# # TODO: Handle exception when status code != 200
135+
# if last_operation['response'].status_code != 200:
136+
# raise Exception(
137+
# 'Error in response. Error code: {}'.format(
138+
# last_operation['response'].text))
139+
# logging.info(
140+
# '[Client Handler] client {} '
141+
# 'finished performing operation {}'.format(
142+
# key, last_operation['op']
143+
# )
144+
# )
145+
# ended_clients.add(key)
146+
# if until_cond == 'n_responses':
147+
# if ((not wait_all and (len(ended_clients) >= self.N_FIRSTS))
148+
# or (wait_all and len(ended_clients) == self.N_FIRSTS)):
149+
# completed = True
150+
# elif until_cond == 'timeout':
151+
# elapsed = time() - self.started
152+
# if ((len(ended_clients) >= self.WAIT_FROM_N_FIRSTS) and
153+
# elapsed > self.TIMEOUT):
154+
# completed = True
155+
# else:
156+
# raise Exception('Not implemented')
157+
# sleep(0.2)
158+
# self.terminate_processes()
159+
# return list(ended_clients)
160+
#
161+
# def terminate_processes(self):
162+
# for p in self.processes:
163+
# p.terminate()
164+
# self.processes = []
165+
#
166+
# @staticmethod
167+
# def get_client_key(host, port):
168+
# return (host, port)
169+
#
170+
# def perform_request(self, host, port, endpoint):
171+
# client_key = self.get_client_key(host, port)
172+
# last_operation = {
173+
# 'started': time(),
174+
# 'op': endpoint,
175+
# 'status': 'started',
176+
# 'ended': None
177+
# }
178+
# url = 'http://{}:{}/{}'.format(host, port, endpoint)
179+
# res = requests.get(url)
180+
# last_operation.update({'status': 'ended',
181+
# 'ended': time(),
182+
# 'response': res})
183+
# self.lock.acquire()
184+
# self.last_operation[client_key] = last_operation
185+
# self.lock.release()
186+
#

orchestrator/orchestrator.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,14 @@ def main(op_mode):
7878
send_iteration_to_frontend(i)
7979

8080
logging.info('Sending /train_model request to clients...')
81-
performed_clients = ch.perform_requests_and_wait('train_model')
82-
logging.info('Performed clients: {}'.format(performed_clients))
81+
ch.perform_requests_and_wait('train_model')
82+
#logging.info('Performed clients: {}'.format(performed_clients))
8383
logging.info('Done')
8484
log_elapsed_time(start)
8585

8686
logging.info('Sending /send_model command to clients...')
87-
performed_clients = ch.perform_requests_and_wait('send_model')
88-
logging.info('Performed clients: {}'.format(performed_clients))
87+
ch.perform_requests_and_wait('send_model')
88+
#logging.info('Performed clients: {}'.format(performed_clients))
8989
logging.info('Done')
9090
log_elapsed_time(start)
9191

0 commit comments

Comments
 (0)