Skip to content

Commit 3b85e49

Browse files
committed
Refactor to load more data from config dict that directly from config module
1 parent ad2893b commit 3b85e49

File tree

10 files changed

+107
-152
lines changed

10 files changed

+107
-152
lines changed

config.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,19 +200,25 @@
200200
"Deadline": DEADLINE,
201201
"Control": {
202202
"Alpha": ALPHA,
203-
"Overscale": OVERSCALE,
203+
"OverScale": OVERSCALE,
204204
"MaxExecutor": MAXEXECUTOR,
205+
"CoreVM": COREVM,
205206
"K": K,
206207
"Ti": TI,
207-
"Tsample": TSAMPLE,
208+
"TSample": TSAMPLE,
208209
"CoreQuantum": COREQUANTUM
209210
},
210211
"Aws": {
211212
"InstanceType": INSTANCE_TYPE,
212213
"HyperThreading": not DISABLEHT,
213214
"Price": PRICE,
214215
"AMI": DATA_AMI[REGION]["ami"],
215-
"Region": REGION
216+
"Region": REGION,
217+
"AZ": DATA_AMI[REGION]["az"],
218+
"SecurityGroup": SECURITY_GROUP,
219+
"KeyPair": DATA_AMI[REGION]["keypair"],
220+
"EbsOptimized": EBS_OPTIMIZED,
221+
"SnapshotId": DATA_AMI[REGION]["snapid"]
216222
},
217223
"Spark": {
218224
"ExecutorCore": COREVM,
@@ -222,7 +228,8 @@
222228
"LocalityWaitProcess": LOCALITY_WAIT_PROCESS,
223229
"LocalityWaitNode": LOCALITY_WAIT_NODE,
224230
"LocalityWaitRack": LOCALITY_WAIT_RACK,
225-
"CPUtask": CPU_TASK
231+
"CPUTask": CPU_TASK,
232+
"SparkHome": SPARK_HOME
226233
},
227234
"HDFS": bool(HDFS)
228235
}

launch.py

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
from errno import ECONNREFUSED
99
from errno import ETIMEDOUT
1010

11-
from config import DATA_AMI, INSTANCE_TYPE, REGION, PRICE, SECURITY_GROUP, EBS_OPTIMIZED
12-
1311

1412
def query_yes_no(question, default="yes"):
1513
"""Ask a yes/no question via raw_input() and return their answer.
@@ -154,19 +152,21 @@ def terminate(client, spot_request_ids, instance_ids):
154152
client.instances.filter(InstanceIds=instance_ids).terminate()
155153

156154

157-
def check_spot_price(client):
155+
def check_spot_price(client, config):
158156
"""Check the current spot price on the selected amazon region of the instance type choosen
159157
and compare with the one provided by the user
160158
161159
:param client: the ec2 client
160+
:param config: the configuration dictionary of the user
162161
:return: Exit if the spot price of the user is too low (< current price + 20%)
163162
"""
164163

165-
spot_price_history_response = client.describe_spot_price_history(InstanceTypes=[INSTANCE_TYPE],
166-
ProductDescriptions=[
164+
spot_price_history_response = client.describe_spot_price_history(
165+
InstanceTypes=[config["Aws"]["InstanceType"]],
166+
ProductDescriptions=[
167167
'Linux/UNIX'],
168-
AvailabilityZone=
169-
DATA_AMI[REGION]["az"])
168+
AvailabilityZone=
169+
config["Aws"]["AZ"])
170170
print(spot_price_history_response['SpotPriceHistory'][0])
171171
last_spot_price = [float(x['SpotPrice']) for x in
172172
spot_price_history_response['SpotPriceHistory'][:10]]
@@ -176,49 +176,48 @@ def check_spot_price(client):
176176
spot_price += (spot_price * 0.2)
177177
spot_price = float("{0:.2f}".format(spot_price))
178178
print("LAST 10 SPOT PRICE + 20%: " + str(spot_price))
179-
print("YOUR PRICE: " + str(PRICE))
180-
if float(PRICE) < spot_price:
179+
print("YOUR PRICE: " + str(config["Aws"]["Price"]))
180+
if float(config["Aws"]["Price"]) < spot_price:
181181
print("ERROR PRICE")
182182
exit(1)
183183

184184

185-
def launch(client, num_instance):
185+
def launch(client, num_instance, config):
186186
"""
187187
Launch num_instance on Amazon EC2 with spot request
188188
189189
:param client: the ec2 client
190190
:param num_instance: number of instance to launch
191+
:param config: the configuration dictionary of the user
191192
:return: the list of spot request's ids
192193
"""
193194
if query_yes_no("Are you sure to launch " + str(num_instance) + " new instance?", "no"):
194-
check_spot_price(client)
195+
check_spot_price(client, config)
195196
spot_request_response = client.request_spot_instances(
196-
SpotPrice=PRICE,
197+
SpotPrice=config["Aws"]["Price"],
197198
InstanceCount=num_instance,
198199
Type='one-time',
199200
AvailabilityZoneGroup=
200-
DATA_AMI[REGION]["az"],
201+
config["Aws"]["AZ"],
201202
LaunchSpecification={
202-
"ImageId": DATA_AMI[REGION]["ami"],
203-
"KeyName": DATA_AMI[REGION]["keypair"],
203+
"ImageId": config["Aws"]["AMI"],
204+
"KeyName": config["Aws"]["KeyPair"],
204205
"SecurityGroups": [
205-
SECURITY_GROUP,
206+
config["Aws"]["SecurityGroup"],
206207
],
207208
'Placement': {
208-
'AvailabilityZone':
209-
DATA_AMI[REGION]["az"],
209+
'AvailabilityZone': config["Aws"]["AZ"],
210210
},
211-
"InstanceType": INSTANCE_TYPE,
212-
"EbsOptimized": EBS_OPTIMIZED,
211+
"InstanceType": config["Aws"]["InstanceType"],
212+
"EbsOptimized": config["Aws"]["EbsOptimized"],
213213
"BlockDeviceMappings": [
214214
{
215215
"DeviceName": "/dev/sda1",
216216
"Ebs": {
217217
"DeleteOnTermination": True,
218218
"VolumeType": "gp2",
219219
"VolumeSize": 200,
220-
"SnapshotId":
221-
DATA_AMI[REGION]["snapid"]
220+
"SnapshotId": config["Aws"]["SnapshotId"]
222221
}
223222
},
224223
{

log.py

Lines changed: 45 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
"""
2+
Module that handles the cluster log:
23
4+
* Download from master and slaves
5+
* Extract app data
6+
* Extract worker data
37
"""
48

59
import multiprocessing
@@ -10,21 +14,20 @@
1014

1115
from boto.manage.cmdshell import sshclient_from_instance
1216

13-
from config import KEYPAIR_PATH, SPARK_HOME, COREVM, COREHTVM
1417
from util.utils import timing, string_to_datetime
1518

1619

17-
def download_master(i, output_folder, log_folder):
18-
"""
20+
def download_master(i, output_folder, log_folder, config):
21+
"""Download log from master instance
1922
20-
:param i:
21-
:param output_folder:
22-
:param log_folder:
23-
:return:
23+
:param i: master instance
24+
:param output_folder: output folder where save the log
25+
:param log_folder: log folder on the master instance
26+
:return: output_folder and the app_id: the application id
2427
"""
25-
ssh_client = sshclient_from_instance(i, KEYPAIR_PATH, user_name='ubuntu')
28+
ssh_client = sshclient_from_instance(i, config["Aws"]["KeyPair"], user_name='ubuntu')
2629
app_id = ""
27-
for file in ssh_client.listdir("" + SPARK_HOME + "spark-events/"):
30+
for file in ssh_client.listdir("" + config["Spark"]["SparkHome"] + "spark-events/"):
2831
print("BENCHMARK: " + file)
2932
print("LOG FOLDER: " + log_folder)
3033
print("OUTPUT FOLDER: " + output_folder)
@@ -35,11 +38,12 @@ def download_master(i, output_folder, log_folder):
3538
os.makedirs(output_folder)
3639
except FileExistsError:
3740
print("Output folder already exists")
38-
inputfile = SPARK_HOME + "spark-events/" + file
39-
outputbz = inputfile + ".bz"
41+
input_file = config["Spark"]["SparkHome"] + "spark-events/" + file
42+
output_bz = input_file + ".bz"
4043
print("Bzipping event log...")
41-
ssh_client.run("pbzip2 -9 -p" + str(COREVM) + " -c " + inputfile + " > " + outputbz)
42-
ssh_client.get_file(outputbz, output_folder + "/" + file + ".bz")
44+
ssh_client.run("pbzip2 -9 -p" + str(
45+
config["Control"]["CoreVM"]) + " -c " + input_file + " > " + output_bz)
46+
ssh_client.get_file(output_bz, output_folder + "/" + file + ".bz")
4347
for file in ssh_client.listdir(log_folder):
4448
print(file)
4549
if file != "bench-report.dat":
@@ -48,20 +52,22 @@ def download_master(i, output_folder, log_folder):
4852
return output_folder, app_id
4953

5054

51-
def download_slave(i, output_folder, app_id):
52-
"""
55+
def download_slave(i, output_folder, app_id, config):
56+
"""Download log from slave instance:
57+
* The worker log that includes the controller output
58+
* The cpu monitoring log
5359
54-
:param i:
55-
:param output_folder:
56-
:param app_id:
57-
:return:
60+
:param i: the slave instance
61+
:param output_folder: the output folder where to save log
62+
:param app_id: the application
63+
:return: output_folder: the output folder
5864
"""
59-
ssh_client = sshclient_from_instance(i, KEYPAIR_PATH, user_name='ubuntu')
65+
ssh_client = sshclient_from_instance(i, config["Aws"]["KeyPair"], user_name='ubuntu')
6066
print("Downloading log from slave: " + i.public_dns_name)
6167
try:
6268
worker_ip_fixed = i.private_ip_address.replace(".", "-")
6369
worker_log = "{0}logs/spark-ubuntu-org.apache.spark.deploy.worker.Worker-1-ip-{1}.out".format(
64-
SPARK_HOME, worker_ip_fixed)
70+
config["Spark"]["SparkHome"], worker_ip_fixed)
6571
print(worker_log)
6672
ssh_client.run(
6773
"screen -ls | grep Detached | cut -d. -f1 | awk '{print $1}' | xargs -r kill")
@@ -73,24 +79,25 @@ def download_slave(i, output_folder, app_id):
7379
except FileNotFoundError:
7480
print("worker log not found")
7581
try:
76-
for file in ssh_client.listdir(SPARK_HOME + "work/" + app_id + "/"):
82+
for file in ssh_client.listdir(config["Spark"]["SparkHome"] + "work/" + app_id + "/"):
7783
print("Executor ID: " + file)
78-
ssh_client.get_file(SPARK_HOME + "work/" + app_id + "/" + file + "/stderr",
79-
output_folder + "/" + i.public_dns_name + "-" + file + ".stderr")
84+
ssh_client.get_file(
85+
config["Spark"]["SparkHome"] + "work/" + app_id + "/" + file + "/stderr",
86+
output_folder + "/" + i.public_dns_name + "-" + file + ".stderr")
8087
except FileNotFoundError:
8188
print("stderr not found")
8289
return output_folder
8390

8491

8592
@timing
8693
def download(log_folder, instances, master_dns, output_folder):
87-
"""
94+
""" Download the logs from the master and the worker nodes
8895
89-
:param log_folder:
90-
:param instances:
91-
:param master_dns:
92-
:param output_folder:
93-
:return:
96+
:param log_folder: the log folder of the application
97+
:param instances: the instances of the cluster
98+
:param master_dns: the dns of the master instances
99+
:param output_folder: the output folder where to save the logs
100+
:return: the output folder
94101
"""
95102
# MASTER
96103
print("Downloading log from Master: " + master_dns)
@@ -106,7 +113,6 @@ def download(log_folder, instances, master_dns, output_folder):
106113
return output_folder
107114

108115

109-
@timing
110116
def load_app_data(app_log_path):
111117
"""
112118
Function that parse the application data like stage ids, start, deadline, end,
@@ -181,7 +187,7 @@ def load_app_data(app_log_path):
181187
return app_info
182188

183189

184-
def load_worker_data(worker_log, cpu_log):
190+
def load_worker_data(worker_log, cpu_log, config):
185191
"""
186192
Load the controller data from the worker_log and combine with the cpu_real data from cpu_log
187193
@@ -245,7 +251,12 @@ def load_worker_data(worker_log, cpu_log):
245251
and line[1] != " CPU" and line[0] != "Average:":
246252
worker_dict["time_cpu"].append(
247253
dt.strptime(line[0], '%I:%M:%S %p').replace(year=2016))
248-
cpuint = float('{0:.2f}'.format((float(line[2]) * COREHTVM) / 100))
249-
worker_dict["cpu_real"].append(cpuint)
254+
if config["Aws"]["HyperThreading"]:
255+
cpu_real = float(
256+
'{0:.2f}'.format((float(line[2]) * config["Control"]["CoreVM"] * 2) / 100))
257+
else:
258+
cpu_real = float(
259+
'{0:.2f}'.format((float(line[2]) * config["Control"]["CoreVM"]) / 100))
260+
worker_dict["cpu_real"].append(cpu_real)
250261
print(list(worker_dict.keys()))
251262
return worker_dict

main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import launch
77
import run
88
from config import NUMINSTANCE, REGION, TAG, REBOOT, CLUSTER_ID, TERMINATE, RUN, NUM_RUN, \
9-
CREDENTIAL_PROFILE
9+
CREDENTIAL_PROFILE, CONFIG_DICT
1010

1111

1212
def main():
@@ -19,7 +19,7 @@ def main():
1919
client = session.client('ec2', region_name=REGION)
2020

2121
if NUMINSTANCE > 0:
22-
spot_request_ids = launch.launch(client, NUMINSTANCE)
22+
spot_request_ids = launch.launch(client, NUMINSTANCE, CONFIG_DICT)
2323

2424
print("CHECK SECURITY GROUP ALLOWED IP SETTINGS!!!")
2525

0 commit comments

Comments
 (0)