Skip to content

Commit

Permalink
[CE-287] Update module layer to support k8s agent
Browse files Browse the repository at this point in the history
1. Add code to handle k8s type host.
2. Refined the cluster creation to avoid duplicate code.
3. Restrict the naming of Fabric cluster due to the
   cluster name will be transformed to k8s namespace.

Change-Id: I516f4c9c9b6b07594dbf4d13018db6e47fb62452
Signed-off-by: luke <[email protected]>
  • Loading branch information
jiahaoc1993 committed Apr 26, 2018
1 parent fc0da0a commit 491c544
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 74 deletions.
4 changes: 2 additions & 2 deletions src/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
from .docker.host import DockerHost
from .docker.cluster import ClusterOnDocker

from .k8s.cluster_operations import K8sClusterOperation
from .k8s.host_operations import KubernetesOperation
from .k8s.cluster import ClusterOnKubernetes
from .k8s.host import KubernetesHost
from .k8s.cluster_operations import K8sClusterOperation
from .k8s.cluster import ClusterOnKubernetes

from .vsphere.host_operations import VsphereOperation
from .vsphere.host import VsphereHost
Expand Down
116 changes: 102 additions & 14 deletions src/agent/k8s/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sys

from agent import K8sClusterOperation
from agent import KubernetesHost

sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
from common import log_handler, LOG_LEVEL
Expand All @@ -18,6 +19,8 @@

from ..cluster_base import ClusterBase

from modules.models import Cluster as ClusterModel
from modules.models import Container, ServicePort

logger = logging.getLogger(__name__)
logger.setLevel(LOG_LEVEL)
Expand All @@ -31,28 +34,74 @@ class ClusterOnKubernetes(ClusterBase):
def __init__(self):
pass

def create(self, kube_config, cluster_name, ports_index, nfsServer_ip):
def _get_cluster_info(self, cid, config):
cluster = ClusterModel.objects.get(id=cid)

cluster_name = cluster.name
kube_config = KubernetesHost().get_kubernets_config(cluster
.host
.k8s_param)

clusters_exists = ClusterModel.objects(host=cluster.host)
ports_index = [service.port for service in ServicePort
.objects(cluster__in=clusters_exists)]

nfsServer_ip = cluster.host.k8s_param.get('nfsServer')
consensus = config['consensus_plugin']

return cluster, cluster_name, kube_config, ports_index, \
nfsServer_ip, consensus

def create(self, cid, mapped_ports, host, config, user_id):
try:
cluster, cluster_name, kube_config, ports_index, nfsServer_ip, \
consensus = self._get_cluster_info(cid, config)

operation = K8sClusterOperation(kube_config)
containers = operation.deploy_cluster(cluster_name,
ports_index,
nfsServer_ip)
nfsServer_ip,
consensus)
except Exception as e:
logger.error("Failed to create Kubernetes Cluster: {}".format(e))
return None
return containers

def delete(self, kube_config, cluster_name, ports_index, nfsServer_ip):
def delete(self, cid, worker_api, config):
try:
cluster, cluster_name, kube_config, ports_index, nfsServer_ip,\
consensus = self._get_cluster_info(cid, config)

operation = K8sClusterOperation(kube_config)
operation.delete_cluster(cluster_name, ports_index, nfsServer_ip)
operation.delete_cluster(cluster_name,
ports_index,
nfsServer_ip,
consensus)

# delete ports for clusters
cluster_ports = ServicePort.objects(cluster=cluster)
if cluster_ports:
for ports in cluster_ports:
ports.delete()
cluster_containers = Container.objects(cluster=cluster)
if cluster_containers:
for container in cluster_containers:
container.delete()

except Exception as e:
logger.error("Failed to delete Kubernetes Cluster: {}".format(e))
return False
return True

def get_services_urls(self, kube_config, cluster_name):
def get_services_urls(self, cid):
try:
cluster = ClusterModel.objects.get(id=cid)

cluster_name = cluster.name
kube_config = KubernetesHost().get_kubernets_config(cluster
.host
.k8s_param)

operation = K8sClusterOperation(kube_config)
services_urls = operation.get_services_urls(cluster_name)
except Exception as e:
Expand All @@ -61,31 +110,70 @@ def get_services_urls(self, kube_config, cluster_name):
return None
return services_urls

def start(self, kube_config, cluster_name, ports_index, nfsServer_ip):
def start(self, name, worker_api, mapped_ports, log_type, log_level,
log_server, config):
try:
cluster, cluster_name, kube_config, ports_index, nfsServer_ip, \
consensus = self._get_cluster_info(name, config)

operation = K8sClusterOperation(kube_config)
containers = operation.start_cluster(cluster_name, ports_index,
nfsServer_ip)
nfsServer_ip, consensus)

if not containers:
logger.warning("failed to start cluster={}, stop it again."
.format(cluster_name))
operation.stop_cluster(cluster_name, ports_index,
nfsServer_ip, consensus)
return None

service_urls = self.get_services_urls(name)
# Update the service port table in db
for k, v in service_urls.items():
service_port = ServicePort(name=k, ip=v.split(":")[0],
port=int(v.split(":")[1]),
cluster=cluster)
service_port.save()
for k, v in containers.items():
container = Container(id=v, name=k, cluster=cluster)
container.save()

except Exception as e:
logger.error("Failed to start Kubernetes Cluster: {}".format(e))
return None
return containers

def stop(self, kube_config, cluster_name, ports_index, nfsServer_ip):
def stop(self, name, worker_api, mapped_ports, log_type, log_level,
log_server, config):
try:
cluster, cluster_name, kube_config, ports_index, nfsServer_ip, \
consensus = self._get_cluster_info(name, config)

operation = K8sClusterOperation(kube_config)
operation.stop_cluster(cluster_name, ports_index, nfsServer_ip)
operation.stop_cluster(cluster_name,
ports_index,
nfsServer_ip,
consensus)

cluster_ports = ServicePort.objects(cluster=cluster)
for ports in cluster_ports:
ports.delete()
cluster_containers = Container.objects(cluster=cluster)
for container in cluster_containers:
container.delete()

except Exception as e:
logger.error("Failed to stop Kubernetes Cluster: {}".format(e))
return False
return True

def restart(self, kube_config, cluster_name, ports_index, nfsServer_ip):
result = self.stop(kube_config, cluster_name, ports_index,
nfsServer_ip)
def restart(self, name, worker_api, mapped_ports, log_type, log_level,
log_server, config):
result = self.stop(name, worker_api, mapped_ports, log_type, log_level,
log_server, config)
if result:
return self.start(kube_config, cluster_name, ports_index,
nfsServer_ip)
return self.start(name, worker_api, mapped_ports, log_type,
log_level, log_server, config)
else:
logger.error("Failed to Restart Kubernetes Cluster")
return None
Loading

0 comments on commit 491c544

Please sign in to comment.