Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a hidden feature for deploy.py to separate code and config #736

Merged
merged 1 commit into from
Jan 4, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 69 additions & 70 deletions src/ClusterBootstrap/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2792,21 +2792,26 @@ def update_config_nodes():
update_config_node( node )

# Running a kubectl commands.
def run_kube( prog, commands ):
def run_kube(prog, commands, config_dir):
one_command = " ".join(commands)
kube_command = ""
if (config["isacs"]):
kube_command = "%s --kubeconfig=./deploy/%s %s" % (prog, config["acskubeconfig"], one_command)
kubeconfig_path = os.path.join(config_dir, config["acskubeconfig"])
kube_command = "%s --kubeconfig=%s %s" % (prog, kubeconfig_path, one_command)
else:
ca_path = os.path.join(config_dir, "ssl/ca/ca.pem")
key_path = os.path.join(config_dir, "ssl/kubelet/apiserver-key.pem")
pem_path = os.path.join(config_dir, "ssl/kubelet/apiserver.pem")

nodes = get_ETCD_master_nodes(config["clusterId"])
master_node = random.choice(nodes)
kube_command = ("%s --server=https://%s:%s --certificate-authority=%s --client-key=%s --client-certificate=%s %s" % (prog, master_node, config["k8sAPIport"], "./deploy/ssl/ca/ca.pem", "./deploy/ssl/kubelet/apiserver-key.pem", "./deploy/ssl/kubelet/apiserver.pem", one_command) )
kube_command = ("%s --server=https://%s:%s --certificate-authority=%s --client-key=%s --client-certificate=%s %s" % (prog, master_node, config["k8sAPIport"], ca_path, key_path, pem_path, one_command) )
if verbose:
print kube_command
print("executing command %s" % (kube_command))
os.system(kube_command)

def run_kubectl( commands ):
run_kube( "./deploy/bin/kubectl", commands)
def run_kubectl(commands, config_dir="./deploy"):
run_kube("./deploy/bin/kubectl", commands, config_dir)

def kubernetes_get_node_name(node):
kube_node_name = ""
Expand Down Expand Up @@ -2893,8 +2898,8 @@ def get_service_yaml( use_service ):
fname = servicedic[use_service]
return fname

def kubernetes_label_node(cmdoptions, nodename, label):
run_kubectl(["label nodes %s %s %s" % (cmdoptions, nodename, label)])
def kubernetes_label_node(cmdoptions, nodename, label, config_dir):
run_kubectl(["label nodes %s %s %s" % (cmdoptions, nodename, label)], config_dir)

# Get the list of nodes for a particular service
#
Expand Down Expand Up @@ -2940,7 +2945,7 @@ def get_node_lists_for_service(service):
# worker_node: all worker node
# The kubernete node will be marked accordingly to facilitate the running of daemon service.

def kubernetes_label_nodes( verb, servicelists, force ):
def kubernetes_label_nodes(verb, servicelists, force, config_dir):
servicedic = get_all_services()
print "servicedic\n", servicedic
get_nodes(config["clusterId"])
Expand Down Expand Up @@ -2969,11 +2974,11 @@ def kubernetes_label_nodes( verb, servicelists, force ):
for node in nodes:
nodename = kubernetes_get_node_name(node)
if verb == "active":
kubernetes_label_node(cmdoptions, nodename, label+"=active")
kubernetes_label_node(cmdoptions, nodename, label+"=active", config_dir)
elif verb == "inactive":
kubernetes_label_node(cmdoptions, nodename, label+"=inactive")
kubernetes_label_node(cmdoptions, nodename, label+"=inactive", config_dir)
elif verb == "remove":
kubernetes_label_node(cmdoptions, nodename, label+"-")
kubernetes_label_node(cmdoptions, nodename, label+"-", config_dir)


# Label kubernete nodes with gpu types.skip for CPU workers
Expand Down Expand Up @@ -3085,7 +3090,7 @@ def kubernetes_patch_nodes_provider (provider, scaledOnly):
# Label kubernete nodes according to property of node (usually specified in config.yaml or cluster.yaml)
# Certain property of node:
# E.g., rack
def kubernetes_mark_nodes( marklist, bMark ):
def kubernetes_mark_nodes(marklist, bMark, config_dir):
if marklist == []:
marklist = config["kubemarks"]
if verbose:
Expand All @@ -3100,11 +3105,11 @@ def kubernetes_mark_nodes( marklist, bMark ):
for mark in marklist:
if mark in nodeconfig:
if bMark:
kubernetes_label_node( "--overwrite", nodename, mark+"="+nodeconfig[mark] )
kubernetes_label_node( "--overwrite", nodename, mark+"="+nodeconfig[mark], config_dir)
else:
kubernetes_label_node( "", nodename, mark+"-" )
kubernetes_label_node( "", nodename, mark+"-", config_dir)

def start_one_kube_service(fname):
def start_one_kube_service(fname, config_dir):
if verbose:
# use try/except because yaml.load cannot load yaml file with multiple documents.
try:
Expand All @@ -3118,14 +3123,14 @@ def start_one_kube_service(fname):

if fname == "./deploy/services/jobmanager/jobmanager.yaml":
# recreate the configmap dlws-scripts
run_kubectl( ["create configmap dlws-scripts --from-file=../Jobs_Templete/ -o yaml --dry-run | ./deploy/bin/kubectl apply -f -"] )
run_kubectl(["create configmap dlws-scripts --from-file=../Jobs_Templete/ -o yaml --dry-run | ./deploy/bin/kubectl apply -f -"], config_dir)

run_kubectl( ["create", "-f", fname ] )
run_kubectl(["create", "-f", fname], config_dir)

def stop_one_kube_service(fname):
run_kubectl( ["delete", "-f", fname ] )
def stop_one_kube_service(fname, config_dir):
run_kubectl(["delete", "-f", fname], config_dir)

def start_kube_service( servicename ):
def start_kube_service(servicename, config_dir):
fname = get_service_yaml( servicename )
dirname = os.path.dirname(fname)
if os.path.exists(os.path.join(dirname,"launch_order")) and "/" not in servicename:
Expand All @@ -3141,7 +3146,7 @@ def start_kube_service( servicename ):
else:
start_one_kube_service(fname)

def stop_kube_service( servicename ):
def stop_kube_service(servicename, config_dir):
fname = get_service_yaml( servicename )
dirname = os.path.dirname(fname)
if os.path.exists(os.path.join(dirname,"launch_order")) and "/" not in servicename:
Expand All @@ -3151,27 +3156,27 @@ def stop_kube_service( servicename ):
# If this line is a sleep tag, skip this line.
if not filename.startswith("SLEEP"):
filename = filename.strip('\n')
stop_one_kube_service(os.path.join(dirname,filename))
stop_one_kube_service(os.path.join(dirname,filename), config_dir)
else:
stop_one_kube_service(fname)
stop_one_kube_service(fname, config_dir)


def replace_kube_service( servicename ):
fname = get_service_yaml( servicename )
run_kubectl( ["replace --force", "-f", fname ] )
def replace_kube_service(servicename, config_dir):
fname = get_service_yaml(servicename)
run_kubectl(["replace --force", "-f", fname], config_dir)

def run_kube_command_node(verb, nodes):
def run_kube_command_node(verb, nodes, config_dir):
for node in nodes:
nodename = kubernetes_get_node_name(node)
run_kubectl( [verb, nodename ] )
run_kubectl([verb, nodename], config_dir)

def run_kube_command_on_nodes( nargs ):
def run_kube_command_on_nodes(nargs, config_dir):
verb = nargs[0]
if len(nargs)>1:
nodes = nargs[1:]
else:
nodes = get_ETCD_master_nodes(config["clusterId"])
run_kube_command_node( verb, nodes)
run_kube_command_node(verb, nodes, config_dir)

def render_docker_images():
if verbose:
Expand Down Expand Up @@ -3256,40 +3261,33 @@ def run_command( args, command, nargs, parser ):
exit()

# Cluster Config
config_cluster = os.path.join(dirpath,"cluster.yaml")
config_cluster = os.path.join(args.directory, "cluster.yaml")
if os.path.exists(config_cluster):
merge_config( config, yaml.load(open(config_cluster)))

merge_config(config, yaml.load(open(config_cluster)))

config_file = os.path.join(dirpath,"config.yaml")
config_file = os.path.join(args.directory, "config.yaml")
if not os.path.exists(config_file):
parser.print_help()
print "ERROR: config.yaml does not exist!"
exit()

f = open(config_file)
merge_config(config, yaml.load(f))
f.close()
if os.path.exists("./deploy/clusterID.yml"):
f = open("./deploy/clusterID.yml")
tmp = yaml.load(f)
f.close()
if "clusterId" in tmp:
config["clusterId"] = tmp["clusterId"]
if "copy_sshtemp" in config and config["copy_sshtemp"]:
if "ssh_origfile" not in config:
config["ssh_origfile"] = config["ssh_cert"]
sshfile = os.path.join(dirpath,config["ssh_origfile"])
if os.path.exists(sshfile):
_, sshtempfile = tempfile.mkstemp(dir='/tmp')
if verbose:
print "SSH file is now {0}".format(sshtempfile)
with open (sshtempfile, 'wb') as output:
with open (sshfile, 'rb') as input:
output.write(input.read())
config["ssh_cert"] = sshtempfile
else:
print "SSH Key {0} not found using original".format(sshfile)
with open(config_file) as f:
merge_config(config, yaml.load(f))

cluster_id_f = os.path.join(args.directory, "clusterID/clusterID.yml")
if os.path.exists(cluster_id_f):
with open(cluster_id_f) as f:
tmp = yaml.load(f)
if "clusterId" in tmp:
config["clusterId"] = tmp["clusterId"]
else:
cluster_id_f = os.path.join(dirpath, "deploy/clusterID.yml")
if os.path.exists(cluster_id_f):
with open(cluster_id_f) as f:
tmp = yaml.load(f)
if "clusterId" in tmp:
config["clusterId"] = tmp["clusterId"]

add_acs_config(command)
if verbose and config["isacs"]:
print "Using Azure Container Services"
Expand Down Expand Up @@ -3669,11 +3667,11 @@ def run_command( args, command, nargs, parser ):
dockername = fetch_config_and_check(config, ["glusterFS", "glusterfs_docker"])
push_docker_images( [dockername] )
elif nargs[0] == "start":
start_kube_service("glusterFS")
start_kube_service("glusterFS", args.directory)
launch_glusterFS_endpoint( nodesinfo, glusterFSargs )
elif nargs[0] == "stop":
stop_glusterFS_endpoint()
stop_kube_service("glusterFS")
stop_kube_service("glusterFS", args.directory)
else:
parser.print_help()
print "Unknown subcommand for glusterFS: " + nargs[0]
Expand Down Expand Up @@ -3871,7 +3869,7 @@ def run_command( args, command, nargs, parser ):
update_config_nodes()

elif command == "kubectl":
run_kubectl(nargs)
run_kubectl(nargs, args.directory)

elif command == "kubernetes":
configuration( config, verbose )
Expand All @@ -3884,7 +3882,7 @@ def run_command( args, command, nargs, parser ):
for service in allservices:
servicenames.append(service)
generate_hdfs_containermounts()
configuration( config, verbose )
configuration(config, verbose )
if nargs[0] == "start":
if args.force and "hdfsformat" in servicenames:
print("This operation will WIPEOUT HDFS namenode, and erase all data on the HDFS cluster, " )
Expand All @@ -3893,20 +3891,20 @@ def run_command( args, command, nargs, parser ):
config["hdfsconfig"]["formatoptions"] = "--force "
# Start a kubelet service.
for servicename in servicenames:
start_kube_service(servicename)
start_kube_service(servicename, args.directory)
elif nargs[0] == "stop":
# stop a kubelet service.
for servicename in servicenames:
stop_kube_service(servicename)
stop_kube_service(servicename, args.directory)
elif nargs[0] == "restart":
# restart a kubelet service.
for servicename in servicenames:
replace_kube_service(servicename)
replace_kube_service(servicename, args.directory)
elif nargs[0] == "labels":
if len(nargs)>=2 and ( nargs[1] == "active" or nargs[1] == "inactive" or nargs[1] == "remove" ):
kubernetes_label_nodes(nargs[1], nargs[2:], args.yes)
kubernetes_label_nodes(nargs[1], nargs[2:], args.yes, args.directory)
elif len(nargs)==1:
kubernetes_label_nodes("active", [], args.yes )
kubernetes_label_nodes("active", [], args.yes, args.directory)
else:
parser.print_help()
print "Error: kubernetes labels expect a verb which is either active, inactive or remove, but get: %s" % nargs[1]
Expand All @@ -3920,11 +3918,11 @@ def run_command( args, command, nargs, parser ):
else:
print "Error: kubernetes patchprovider expect a verb which is either aztools, gstools or awstools."
elif nargs[0] == "mark":
kubernetes_mark_nodes( nargs[1:], True)
kubernetes_mark_nodes(nargs[1:], True, args.directory)
elif nargs[0] == "unmark":
kubernetes_mark_nodes( nargs[1:], False)
kubernetes_mark_nodes(nargs[1:], False, args.directory)
elif nargs[0] == "cordon" or nargs[0] == "uncordon":
run_kube_command_on_nodes(nargs)
run_kube_command_on_nodes(nargs, args.directory)
elif nargs[0] == "labelvc":
kubernetes_label_vc(True)
else:
Expand Down Expand Up @@ -4319,6 +4317,7 @@ def upgrade_masters(hypekube_url="gcr.io/google-containers/hyperkube:v1.15.2"):
action="store",
default=None
)
parser.add_argument("--directory", "-d", help = "config root dir to read from", default=".")

parser.add_argument("command",
help = "See above for the list of valid command" )
Expand Down