diff --git a/yarn-ec2.py b/yarn-ec2.py index 692ab50..082bbd9 100755 --- a/yarn-ec2.py +++ b/yarn-ec2.py @@ -56,7 +56,7 @@ # Default location to get the yarn-ec2 scripts (and ami-list) from DEFAULT_YARN_EC2_GITHUB_REPO = "https://github.com/CloudComputingCourse/yarn-ec2-s18" -DEFAULT_YARN_EC2_BRANCH = "master" +DEFAULT_YARN_EC2_BRANCH = "main" from boto.ec2.blockdevicemapping import BlockDeviceMapping, BlockDeviceType, EBSBlockDeviceType @@ -74,11 +74,11 @@ def parse_args(): parser = OptionParser( prog="yarn-ec2", usage="%prog [options] \n\n" - + " can be: launch, destroy, login, get-master, stop, start") + + " can be: launch, destroy, login, get-main, stop, start") parser.add_option( - "-s", "--slaves", type="int", default=4, - help="Number of slaves to launch (default: %default)") + "-s", "--subordinates", type="int", default=4, + help="Number of subordinates to launch (default: %default)") parser.add_option( "-k", "--key-pair", help="Key pair to use on instances") @@ -94,15 +94,15 @@ def parse_args(): help="Type of instance to launch (default: %default). " + "WARNING: must be 64-bit; small instances won't work in production") parser.add_option( - "-m", "--master-instance-type", default="", - help="Master instance type (leave empty for same as instance-type)") + "-m", "--main-instance-type", default="", + help="Main instance type (leave empty for same as instance-type)") parser.add_option( "-r", "--region", default="us-east-1", help="EC2 region used to launch instances in, or to find them in (default: %default)") parser.add_option( "-z", "--zone", default="us-east-1a", help="Availability zone to launch instances in, or 'all' to spread " + - "slaves across multiple (an additional $0.01/Gb for bandwidth" + + "subordinates across multiple (an additional $0.01/Gb for bandwidth" + "between zones applies) (default: %default)") parser.add_option( "-a", "--ami", @@ -143,7 +143,7 @@ def parse_args(): "created.") parser.add_option( "--spot-price", metavar="PRICE", type="float", default=1.0, - help="If specified, launch slaves as spot instances with the given " + + help="If specified, launch subordinates as spot instances with the given " + "maximum price (in dollars) (default: %default)") parser.add_option( "-u", "--user", default="ubuntu", @@ -152,8 +152,8 @@ def parse_args(): "--delete-groups", action="store_true", default=False, help="When destroying a cluster, delete the security groups that were created") parser.add_option( - "--use-existing-master", action="store_true", default=False, - help="Launch fresh slaves, but use an existing stopped master if possible") + "--use-existing-main", action="store_true", default=False, + help="Launch fresh subordinates, but use an existing stopped main if possible") parser.add_option( "--user-data", type="string", default="", help="Path to a user-data file (most AMIs interpret this as an initialization script)") @@ -270,7 +270,7 @@ def init_security_group(sg, cidr): # Launch a cluster of the given name, by setting up its security groups, # and then starting new instances in them. -# Returns a tuple of EC2 reservation objects for the master and slaves +# Returns a tuple of EC2 reservation objects for the main and subordinates # Fails if there already instances running in the cluster's groups. def launch_cluster(conn, opts, cluster_name): if opts.identity_file is None: @@ -286,10 +286,10 @@ def launch_cluster(conn, opts, cluster_name): c=opts.secondary_ips, t=opts.instance_type)) sys.exit(1) - if opts.master_instance_type != "": - if opts.secondary_ips + 1 > get_nic_width(opts.master_instance_type): - print("ERROR: unable to allocate {c} secondary ip addresses for master-instance-type: {t}".format( - c=opts.secondary_ips, t=opts.master_instance_type)) + if opts.main_instance_type != "": + if opts.secondary_ips + 1 > get_nic_width(opts.main_instance_type): + print("ERROR: unable to allocate {c} secondary ip addresses for main-instance-type: {t}".format( + c=opts.secondary_ips, t=opts.main_instance_type)) sys.exit(1) if opts.vpc_id is None: @@ -331,19 +331,19 @@ def launch_cluster(conn, opts, cluster_name): print("Setting up security groups...") authorized_address = opts.authorized_address - master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id) - if master_group.rules == []: # Group was just now created - init_security_group(master_group, authorized_address) - slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id) - if slave_group.rules == []: # Group was just now created - init_security_group(slave_group, authorized_address) + main_group = get_or_make_group(conn, cluster_name + "-main", opts.vpc_id) + if main_group.rules == []: # Group was just now created + init_security_group(main_group, authorized_address) + subordinate_group = get_or_make_group(conn, cluster_name + "-subordinates", opts.vpc_id) + if subordinate_group.rules == []: # Group was just now created + init_security_group(subordinate_group, authorized_address) # Check if instances are already running in our groups - existing_masters, existing_slaves = get_existing_cluster( + existing_mains, existing_subordinates = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - if existing_slaves or (existing_masters and not opts.use_existing_master): + if existing_subordinates or (existing_mains and not opts.use_existing_main): print("ERROR: There are already instances running in group %s or %s" % - (master_group.name, slave_group.name), file=stderr) + (main_group.name, subordinate_group.name), file=stderr) sys.exit(1) # Figure out AMI @@ -384,32 +384,32 @@ def launch_cluster(conn, opts, cluster_name): name = '/dev/sd' + string.ascii_letters[i + 1] block_map[name] = dev - # Launch slaves - if opts.slaves != 0 and opts.spot_price is not None: + # Launch subordinates + if opts.subordinates != 0 and opts.spot_price is not None: # Launch spot instances with the requested price - print("Requesting %d slaves at $%.3f per hour" % - (opts.slaves, opts.spot_price)) + print("Requesting %d subordinates at $%.3f per hour" % + (opts.subordinates, opts.spot_price)) zones = get_zones(conn, opts) num_zones = len(zones) i = 0 - slave_req_ids = [] + subordinate_req_ids = [] for zone in zones: - num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) - slave_reqs = conn.request_spot_instances( + num_subordinates_this_zone = get_partition(opts.subordinates, num_zones, i) + subordinate_reqs = conn.request_spot_instances( price=opts.spot_price, image_id=opts.ami, launch_group="yarn-launch-group-%s" % cluster_name, placement=zone, - count=num_slaves_this_zone, + count=num_subordinates_this_zone, key_name=opts.key_pair, - security_group_ids=[slave_group.id] + additional_group_ids, + security_group_ids=[subordinate_group.id] + additional_group_ids, instance_type=opts.instance_type, block_device_map=block_map, subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content, instance_profile_name=opts.instance_profile_name) - slave_req_ids += [req.id for req in slave_reqs] + subordinate_req_ids += [req.id for req in subordinate_reqs] i += 1 print("Waiting...") @@ -420,27 +420,27 @@ def launch_cluster(conn, opts, cluster_name): id_to_req = {} for r in reqs: id_to_req[r.id] = r - slave_instance_ids = [] - for i in slave_req_ids: + subordinate_instance_ids = [] + for i in subordinate_req_ids: if i in id_to_req and id_to_req[i].state == "active": - slave_instance_ids.append(id_to_req[i].instance_id) - if len(slave_instance_ids) == opts.slaves: - print("%d slaves granted" % opts.slaves) - reservations = conn.get_all_reservations(slave_instance_ids) - slave_nodes = [] + subordinate_instance_ids.append(id_to_req[i].instance_id) + if len(subordinate_instance_ids) == opts.subordinates: + print("%d subordinates granted" % opts.subordinates) + reservations = conn.get_all_reservations(subordinate_instance_ids) + subordinate_nodes = [] for r in reservations: - slave_nodes += r.instances + subordinate_nodes += r.instances break else: - print("%d of %d slaves granted, waiting longer" % ( - len(slave_instance_ids), opts.slaves)) + print("%d of %d subordinates granted, waiting longer" % ( + len(subordinate_instance_ids), opts.subordinates)) except: print("Canceling spot instance requests") - conn.cancel_spot_instance_requests(slave_req_ids) + conn.cancel_spot_instance_requests(subordinate_req_ids) # Log a warning if any of these requests actually launched instances: - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - running = len(master_nodes) + len(slave_nodes) + running = len(main_nodes) + len(subordinate_nodes) if running: print(("WARNING: %d instances are still running" % running), file=stderr) sys.exit(0) @@ -449,63 +449,63 @@ def launch_cluster(conn, opts, cluster_name): zones = get_zones(conn, opts) num_zones = len(zones) i = 0 - slave_nodes = [] + subordinate_nodes = [] for zone in zones: - num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) - if num_slaves_this_zone > 0: - slave_res = image.run( + num_subordinates_this_zone = get_partition(opts.subordinates, num_zones, i) + if num_subordinates_this_zone > 0: + subordinate_res = image.run( key_name=opts.key_pair, - security_group_ids=[slave_group.id] + additional_group_ids, + security_group_ids=[subordinate_group.id] + additional_group_ids, instance_type=opts.instance_type, placement=zone, - min_count=num_slaves_this_zone, - max_count=num_slaves_this_zone, + min_count=num_subordinates_this_zone, + max_count=num_subordinates_this_zone, block_device_map=block_map, subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content, instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior, instance_profile_name=opts.instance_profile_name) - slave_nodes += slave_res.instances - print("Launched {s} slave{plural_s} in {z}".format( - s=num_slaves_this_zone, - plural_s=('' if num_slaves_this_zone == 1 else 's'), + subordinate_nodes += subordinate_res.instances + print("Launched {s} subordinate{plural_s} in {z}".format( + s=num_subordinates_this_zone, + plural_s=('' if num_subordinates_this_zone == 1 else 's'), z=zone)) i += 1 - # Launch or resume masters - if existing_masters: - print("Starting master...") - for inst in existing_masters: + # Launch or resume mains + if existing_mains: + print("Starting main...") + for inst in existing_mains: if inst.state not in ["shutting-down", "terminated"]: inst.start() - master_nodes = existing_masters + main_nodes = existing_mains else: if opts.spot_price is not None: # Launch spot instances with the requested price - print("Requesting 1 master at $%.3f per hour" % opts.spot_price) - master_type = opts.master_instance_type - if master_type == "": - master_type = opts.instance_type - master_zone = opts.zone - if master_zone == 'all': - master_zone = random.choice(conn.get_all_zones()).name - master_req_ids = [] - master_req = conn.request_spot_instances( + print("Requesting 1 main at $%.3f per hour" % opts.spot_price) + main_type = opts.main_instance_type + if main_type == "": + main_type = opts.instance_type + main_zone = opts.zone + if main_zone == 'all': + main_zone = random.choice(conn.get_all_zones()).name + main_req_ids = [] + main_req = conn.request_spot_instances( price=opts.spot_price, image_id=opts.ami, launch_group="yarn-launch-group-%s" % cluster_name, - placement=master_zone, + placement=main_zone, count=1, key_name=opts.key_pair, - security_group_ids=[master_group.id] + additional_group_ids, - instance_type=master_type, + security_group_ids=[main_group.id] + additional_group_ids, + instance_type=main_type, block_device_map=block_map, subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content, instance_profile_name=opts.instance_profile_name) - master_req_ids += [req.id for req in master_req] + main_req_ids += [req.id for req in main_req] print("Waiting...") try: @@ -515,44 +515,44 @@ def launch_cluster(conn, opts, cluster_name): id_to_req = {} for r in reqs: id_to_req[r.id] = r - master_instance_ids = [] - for i in master_req_ids: + main_instance_ids = [] + for i in main_req_ids: if i in id_to_req and id_to_req[i].state == "active": - master_instance_ids.append(id_to_req[i].instance_id) - if len(master_instance_ids) == 1: - print("1 master granted") - reservations = conn.get_all_reservations(master_instance_ids) - master_nodes = [] + main_instance_ids.append(id_to_req[i].instance_id) + if len(main_instance_ids) == 1: + print("1 main granted") + reservations = conn.get_all_reservations(main_instance_ids) + main_nodes = [] for r in reservations: - master_nodes += r.instances + main_nodes += r.instances break else: - print("%d of %d master granted, waiting longer" % ( - len(master_instance_ids), 1)) + print("%d of %d main granted, waiting longer" % ( + len(main_instance_ids), 1)) except: print("Canceling spot instance requests") - conn.cancel_spot_instance_requests(master_req_ids) + conn.cancel_spot_instance_requests(main_req_ids) # Log a warning if any of these requests actually launched instances: - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - running = len(master_nodes) + len(slave_nodes) + running = len(main_nodes) + len(subordinate_nodes) if running: print(("WARNING: %d instances are still running" % running), file=stderr) sys.exit(0) else: # Launch non-spot instances - master_type = opts.master_instance_type - if master_type == "": - master_type = opts.instance_type - master_zone = opts.zone - if master_zone == 'all': - master_zone = random.choice(conn.get_all_zones()).name - master_nodes = [] - master_res = image.run( + main_type = opts.main_instance_type + if main_type == "": + main_type = opts.instance_type + main_zone = opts.zone + if main_zone == 'all': + main_zone = random.choice(conn.get_all_zones()).name + main_nodes = [] + main_res = image.run( key_name=opts.key_pair, - security_group_ids=[master_group.id] + additional_group_ids, - instance_type=master_type, - placement=master_zone, + security_group_ids=[main_group.id] + additional_group_ids, + instance_type=main_type, + placement=main_zone, min_count=1, max_count=1, block_device_map=block_map, @@ -561,8 +561,8 @@ def launch_cluster(conn, opts, cluster_name): user_data=user_data_content, instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior, instance_profile_name=opts.instance_profile_name) - master_nodes += master_res.instances - print("Launched 1 master in {z}".format(z=master_zone)) + main_nodes += main_res.instances + print("Launched 1 main in {z}".format(z=main_zone)) # Timed wait print("Waiting for aws to propagate instance metadata...") @@ -583,26 +583,26 @@ def launch_cluster(conn, opts, cluster_name): tags.update(additional_tags) - for master in master_nodes: - master.add_tags( - dict(tags, Name='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) + for main in main_nodes: + main.add_tags( + dict(tags, Name='{cn}-main-{iid}'.format(cn=cluster_name, iid=main.id)) ) - for slave in slave_nodes: - slave.add_tags( - dict(tags, Name='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) + for subordinate in subordinate_nodes: + subordinate.add_tags( + dict(tags, Name='{cn}-subordinate-{iid}'.format(cn=cluster_name, iid=subordinate.id)) ) # Return all the instances - return (master_nodes, slave_nodes) + return (main_nodes, subordinate_nodes) # Reset 2nd ip addresses -def reassign_cluster_ips(conn, master_nodes, slave_nodes, opts, cluster_name): +def reassign_cluster_ips(conn, main_nodes, subordinate_nodes, opts, cluster_name): ''' reset cluster ip addresses ''' print("Reassigning secondary ip addresses...") - for inst in master_nodes + slave_nodes: + for inst in main_nodes + subordinate_nodes: if inst.state != "terminated" and len(inst.interfaces) != 0: nif = inst.interfaces[0] if len(nif.private_ip_addresses) != opts.secondary_ips + 1: @@ -628,7 +628,7 @@ def reassign_cluster_ips(conn, master_nodes, slave_nodes, opts, cluster_name): def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): """ Get the EC2 instances in an existing cluster if available. - Returns a tuple of lists of EC2 instance objects for the masters and slaves. + Returns a tuple of lists of EC2 instance objects for the mains and subordinates. """ print("Searching for existing cluster {c} in region {r}...".format( c=cluster_name, r=opts.region)) @@ -645,54 +645,54 @@ def get_instances(group_names): instances = itertools.chain.from_iterable(r.instances for r in reservations) return [i for i in instances if i.state not in ["shutting-down", "terminated"]] - master_instances = get_instances([cluster_name + "-master"]) - slave_instances = get_instances([cluster_name + "-slaves"]) + main_instances = get_instances([cluster_name + "-main"]) + subordinate_instances = get_instances([cluster_name + "-subordinates"]) - if any((master_instances, slave_instances)): - print("Found {m} master{plural_m}, {s} slave{plural_s}.".format( - m=len(master_instances), - plural_m=('' if len(master_instances) == 1 else 's'), - s=len(slave_instances), - plural_s=('' if len(slave_instances) == 1 else 's'))) + if any((main_instances, subordinate_instances)): + print("Found {m} main{plural_m}, {s} subordinate{plural_s}.".format( + m=len(main_instances), + plural_m=('' if len(main_instances) == 1 else 's'), + s=len(subordinate_instances), + plural_s=('' if len(subordinate_instances) == 1 else 's'))) - if not master_instances and die_on_error: - print("ERROR: Could not find a master for cluster {c} in region {r}.".format( + if not main_instances and die_on_error: + print("ERROR: Could not find a main for cluster {c} in region {r}.".format( c=cluster_name, r=opts.region), file=sys.stderr) sys.exit(1) - return (master_instances, slave_instances) + return (main_instances, subordinate_instances) # Deploy configuration files and run setup scripts on a newly launched or started cluster. -def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): - master = get_dns_name(master_nodes[0], opts.private_ips) +def setup_cluster(conn, main_nodes, subordinate_nodes, opts, deploy_ssh_key): + main = get_dns_name(main_nodes[0], opts.private_ips) if deploy_ssh_key: - print("Generating cluster's SSH key on master...") + print("Generating cluster's SSH key on main...") key_setup = """ [ -f ~/.ssh/id_rsa ] || (ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa -C ibuki && cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys) """ - ssh(master, opts, key_setup) - dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) - print("Transferring cluster's SSH key to slaves...") - for slave in slave_nodes: - slave_address = get_dns_name(slave, opts.private_ips) - print(slave_address) + ssh(main, opts, key_setup) + dot_ssh_tar = ssh_read(main, opts, ['tar', 'c', '.ssh']) + print("Transferring cluster's SSH key to subordinates...") + for subordinate in subordinate_nodes: + subordinate_address = get_dns_name(subordinate, opts.private_ips) + print(subordinate_address) ssh_write( - host=slave_address, + host=subordinate_address, opts=opts, command=['tar', 'x'], arguments=dot_ssh_tar ) print("Passing SSH keys to root...") - for node in master_nodes + slave_nodes: + for node in main_nodes + subordinate_nodes: ssh(get_dns_name(node, opts.private_ips), opts, "sudo cp -r ~/.ssh /root/") - print("Cloning yarn-ec2 scripts from {r}/tree/{b} on master...".format( + print("Cloning yarn-ec2 scripts from {r}/tree/{b} on main...".format( r=opts.yarn_ec2_git_repo, b=opts.yarn_ec2_git_branch)) ssh( - host=master, + host=main, opts=opts, command="sudo rm -rf /root/share/yarn-ec2" + " && " @@ -702,29 +702,29 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): ) ) - print("Deploying files to master...") + print("Deploying files to main...") deploy_files( conn=conn, root_dir=YARN_EC2_DIR + "/" + "deploy.generic", opts=opts, - master_nodes=master_nodes, - slave_nodes=slave_nodes + main_nodes=main_nodes, + subordinate_nodes=subordinate_nodes ) - print("Running setup on master...") - setup_spark_cluster(master, opts) + print("Running setup on main...") + setup_spark_cluster(main, opts) print("Done!") -def setup_spark_cluster(master, opts): - ssh(master, opts, "chmod u+x /root/share/yarn-ec2/setup.sh", force_root=True) - ssh(master, opts, "/root/share/yarn-ec2/setup.sh", force_root=True) - ssh(master, opts, "hdup", force_root=True) - ssh(master, opts, "yup", force_root=True) +def setup_spark_cluster(main, opts): + ssh(main, opts, "chmod u+x /root/share/yarn-ec2/setup.sh", force_root=True) + ssh(main, opts, "/root/share/yarn-ec2/setup.sh", force_root=True) + ssh(main, opts, "hdup", force_root=True) + ssh(main, opts, "yup", force_root=True) time.sleep(5) - ssh(master, opts, "yls", force_root=True) + ssh(main, opts, "yls", force_root=True) print(">> Hadoop HDFS is available at r0:50070") print(">> Hadoop YARN is available at r0:8088") @@ -931,30 +931,30 @@ def get_num_disks(instance_type): # Deploy the configuration file templates in a given local directory to # a cluster, filling in any template parameters with information about the -# cluster (e.g. lists of masters and slaves). Files are only deployed to -# the first master instance in the cluster, and we expect the setup +# cluster (e.g. lists of mains and subordinates). Files are only deployed to +# the first main instance in the cluster, and we expect the setup # script to be run on that instance to copy them to other nodes. # # root_dir should be an absolute path to the directory with the files we want to deploy. -def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes): - active_master = get_dns_name(master_nodes[0], opts.private_ips) +def deploy_files(conn, root_dir, opts, main_nodes, subordinate_nodes): + active_main = get_dns_name(main_nodes[0], opts.private_ips) - master_addresses = [get_dns_name(i, True) for i in master_nodes] - slave_addresses = [get_dns_name(i, True) for i in slave_nodes] + main_addresses = [get_dns_name(i, True) for i in main_nodes] + subordinate_addresses = [get_dns_name(i, True) for i in subordinate_nodes] # Instantiate templates template_vars = { - "master_list": '\n'.join(master_addresses), - "slave_list": '\n'.join(slave_addresses), - "rack0": '\n'.join(get_secondary_ip_addresses(master_nodes[0])), + "main_list": '\n'.join(main_addresses), + "subordinate_list": '\n'.join(subordinate_addresses), + "rack0": '\n'.join(get_secondary_ip_addresses(main_nodes[0])), "rack1": '', "rack2": '', "rack3": '', "rack4": '', } - for i in xrange(0, len(slave_nodes)): - template_vars['rack' + str(i + 1)] = '\n'.join(get_secondary_ip_addresses(slave_nodes[i])) + for i in xrange(0, len(subordinate_nodes)): + template_vars['rack' + str(i + 1)] = '\n'.join(get_secondary_ip_addresses(subordinate_nodes[i])) # Create a temp directory in which we will place all the files to be # deployed after we substitute template parameters in them @@ -976,12 +976,12 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes): text = text.replace("{{" + key + "}}", template_vars[key]) dest.write(text) dest.close() - # rsync the whole directory over to the master machine + # rsync the whole directory over to the main machine command = [ 'rsync', '-rv', '-e', stringify_command(ssh_command(opts)), "%s/" % tmp_dir, - "%s@%s:/root" % ("root", active_master) + "%s@%s:/root" % ("root", active_main) ] subprocess.check_call(command) # Remove the temp directory we created above @@ -1085,10 +1085,10 @@ def get_zones(conn, opts): # Gets the number of items in a partition def get_partition(total, num_partitions, current_partitions): - num_slaves_this_zone = total // num_partitions + num_subordinates_this_zone = total // num_partitions if (total % num_partitions) - current_partitions > 0: - num_slaves_this_zone += 1 - return num_slaves_this_zone + num_subordinates_this_zone += 1 + return num_subordinates_this_zone # Gets a list of secondary ip addresses @@ -1138,21 +1138,21 @@ def real_main(): print("Warning: Unrecognized EC2 instance type for instance-type: {t}".format( t=opts.instance_type), file=stderr) - if opts.master_instance_type != "": - if opts.master_instance_type not in EC2_INSTANCE_TYPES: - print("Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format( - t=opts.master_instance_type), file=stderr) + if opts.main_instance_type != "": + if opts.main_instance_type not in EC2_INSTANCE_TYPES: + print("Warning: Unrecognized EC2 instance type for main-instance-type: {t}".format( + t=opts.main_instance_type), file=stderr) # Since we try instance types even if we can't resolve them, we check if they resolve first # and, if they do, see if they resolve to the same VM type. if opts.instance_type in EC2_INSTANCE_TYPES and \ - opts.master_instance_type in EC2_INSTANCE_TYPES: + opts.main_instance_type in EC2_INSTANCE_TYPES: if EC2_INSTANCE_TYPES[opts.instance_type] != \ - EC2_INSTANCE_TYPES[opts.master_instance_type]: - print("Error: yarn-ec2 currently does not support having a master and slaves " + EC2_INSTANCE_TYPES[opts.main_instance_type]: + print("Error: yarn-ec2 currently does not support having a main and subordinates " "with different AMI virtualization types.", file=stderr) - print("master instance virtualization type: {t}".format( - t=EC2_INSTANCE_TYPES[opts.master_instance_type]), file=stderr) - print("slave instance virtualization type: {t}".format( + print("main instance virtualization type: {t}".format( + t=EC2_INSTANCE_TYPES[opts.main_instance_type]), file=stderr) + print("subordinate instance virtualization type: {t}".format( t=EC2_INSTANCE_TYPES[opts.instance_type]), file=stderr) sys.exit(1) @@ -1180,52 +1180,52 @@ def real_main(): opts.zone = random.choice(conn.get_all_zones()).name if action == "launch": - if opts.slaves <= 0: - opts.slaves = 0 + if opts.subordinates <= 0: + opts.subordinates = 0 if opts.resume: - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) else: - (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name) + (main_nodes, subordinate_nodes) = launch_cluster(conn, opts, cluster_name) wait_for_cluster_state( conn=conn, opts=opts, - cluster_instances=(master_nodes + slave_nodes), + cluster_instances=(main_nodes + subordinate_nodes), cluster_state='ssh-ready' ) reassign_cluster_ips( conn=conn, - master_nodes=master_nodes, - slave_nodes=slave_nodes, + main_nodes=main_nodes, + subordinate_nodes=subordinate_nodes, opts=opts, cluster_name=cluster_name ) setup_cluster( conn=conn, - master_nodes=master_nodes, - slave_nodes=slave_nodes, + main_nodes=main_nodes, + subordinate_nodes=subordinate_nodes, opts=opts, deploy_ssh_key=True ) - elif action == "get-master": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - if not master_nodes[0].public_dns_name and not opts.private_ips: - print("Master has no public DNS name. Maybe you meant to specify --private-ips?") + elif action == "get-main": + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) + if not main_nodes[0].public_dns_name and not opts.private_ips: + print("Main has no public DNS name. Maybe you meant to specify --private-ips?") else: - print(get_dns_name(master_nodes[0], opts.private_ips)) + print(get_dns_name(main_nodes[0], opts.private_ips)) elif action == "login": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - if not master_nodes[0].public_dns_name and not opts.private_ips: - print("Master has no public DNS name. Maybe you meant to specify --private-ips?") + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) + if not main_nodes[0].public_dns_name and not opts.private_ips: + print("Main has no public DNS name. Maybe you meant to specify --private-ips?") else: - master = get_dns_name(master_nodes[0], opts.private_ips) - print("Logging into master " + master + "...") + main = get_dns_name(main_nodes[0], opts.private_ips) + print("Logging into main " + main + "...") proxy_opt = [] if opts.proxy_port is not None: proxy_opt = ['-D', opts.proxy_port] subprocess.check_call( - ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)]) + ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, main)]) elif action == "stop": response = raw_input( @@ -1236,17 +1236,17 @@ def real_main(): "ALL DATA ON SPOT-INSTANCES WILL ALSO BE LOST!!\n" + "Stop cluster " + cluster_name + " (y/N): ") if response == "y": - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - print("Stopping master...") - for inst in master_nodes: + print("Stopping main...") + for inst in main_nodes: if inst.state not in ["shutting-down", "terminated"]: if inst.spot_instance_request_id: inst.terminate() else: inst.stop() - print("Stopping slaves...") - for inst in slave_nodes: + print("Stopping subordinates...") + for inst in subordinate_nodes: if inst.state not in ["shutting-down", "terminated"]: if inst.spot_instance_request_id: inst.terminate() @@ -1254,53 +1254,53 @@ def real_main(): inst.stop() elif action == "start": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - print("Starting slaves...") - for inst in slave_nodes: + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) + print("Starting subordinates...") + for inst in subordinate_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() - print("Starting master...") - for inst in master_nodes: + print("Starting main...") + for inst in main_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() wait_for_cluster_state( conn=conn, opts=opts, - cluster_instances=(master_nodes + slave_nodes), + cluster_instances=(main_nodes + subordinate_nodes), cluster_state='ssh-ready' ) reassign_cluster_ips( conn=conn, - master_nodes=master_nodes, - slave_nodes=slave_nodes, + main_nodes=main_nodes, + subordinate_nodes=subordinate_nodes, opts=opts, cluster_name=cluster_name ) setup_cluster( conn=conn, - master_nodes=master_nodes, - slave_nodes=slave_nodes, + main_nodes=main_nodes, + subordinate_nodes=subordinate_nodes, opts=opts, deploy_ssh_key=True ) # Determine types of running instances - existing_master_type = master_nodes[0].instance_type - existing_slave_type = slave_nodes[0].instance_type - # Setting opts.master_instance_type to the empty string indicates we - # have the same instance type for the master and the slaves - if existing_master_type == existing_slave_type: - existing_master_type = "" - opts.master_instance_type = existing_master_type - opts.instance_type = existing_slave_type + existing_main_type = main_nodes[0].instance_type + existing_subordinate_type = subordinate_nodes[0].instance_type + # Setting opts.main_instance_type to the empty string indicates we + # have the same instance type for the main and the subordinates + if existing_main_type == existing_subordinate_type: + existing_main_type = "" + opts.main_instance_type = existing_main_type + opts.instance_type = existing_subordinate_type elif action == "destroy": - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - if any(master_nodes + slave_nodes): + if any(main_nodes + subordinate_nodes): print("The following instances will be terminated:") - for inst in master_nodes + slave_nodes: + for inst in main_nodes + subordinate_nodes: print("> %s" % get_dns_name(inst, opts.private_ips)) print("ALL DATA ON ALL INSTANCES WILL BE LOST!!") @@ -1308,24 +1308,24 @@ def real_main(): msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name) response = raw_input(msg) if response == "y": - if len(master_nodes) != 0: - print("Terminating master...") - for inst in master_nodes: + if len(main_nodes) != 0: + print("Terminating main...") + for inst in main_nodes: inst.terminate() - print("{m} instances terminated".format(m=len(master_nodes))) - if len(slave_nodes) != 0: - print("Terminating slaves...") - for inst in slave_nodes: + print("{m} instances terminated".format(m=len(main_nodes))) + if len(subordinate_nodes) != 0: + print("Terminating subordinates...") + for inst in subordinate_nodes: inst.terminate() - print("{s} instances terminated".format(s=len(slave_nodes))) + print("{s} instances terminated".format(s=len(subordinate_nodes))) # Delete security groups as well if opts.delete_groups: - group_names = [cluster_name + "-master", cluster_name + "-slaves"] + group_names = [cluster_name + "-main", cluster_name + "-subordinates"] wait_for_cluster_state( conn=conn, opts=opts, - cluster_instances=(master_nodes + slave_nodes), + cluster_instances=(main_nodes + subordinate_nodes), cluster_state='terminated' ) print("Deleting security groups (this may take some time)...")