Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-5079]: Kyuubi HA with kyuubi <> zookeeper integration #26

Merged
merged 9 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
507 changes: 387 additions & 120 deletions lib/charms/data_platform_libs/v0/data_interfaces.py

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ requires:
spark-service-account:
interface: spark_service_account
limit: 1
zookeeper:
interface: zookeeper
theoctober19th marked this conversation as resolved.
Show resolved Hide resolved
limit: 1


provides:
Expand Down
2 changes: 2 additions & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from events.kyuubi import KyuubiEvents
from events.metastore import MetastoreEvents
from events.s3 import S3Events
from events.zookeeper import ZookeeperEvents

# Log messages can be retrieved using juju debug-log
logger = logging.getLogger(__name__)
Expand All @@ -47,6 +48,7 @@ def __init__(self, *args):
self.hub_events = SparkIntegrationHubEvents(self, self.context, self.workload)
self.metastore_events = MetastoreEvents(self, self.context, self.workload)
self.auth_events = AuthenticationEvents(self, self.context, self.workload)
self.zookeeper_events = ZookeeperEvents(self, self.context, self.workload)
self.action_events = ActionEvents(self, self.context, self.workload)


Expand Down
31 changes: 27 additions & 4 deletions src/config/kyuubi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@

from typing import Optional

from constants import AUTHENTICATION_TABLE_NAME
from core.domain import DatabaseConnectionInfo
from constants import AUTHENTICATION_TABLE_NAME, HA_ZNODE_NAME
from core.domain import DatabaseConnectionInfo, ZookeeperInfo
from utils.logging import WithLogging


class KyuubiConfig(WithLogging):
"""Kyuubi Configurations."""

def __init__(self, db_info: Optional[DatabaseConnectionInfo]):
def __init__(
self, db_info: Optional[DatabaseConnectionInfo], zookeeper_info: Optional[ZookeeperInfo]
):
self.db_info = db_info
self.zookeeper_info = zookeeper_info

def _get_db_connection_url(self) -> str:
endpoint = self.db_info.endpoint
Expand All @@ -28,6 +31,13 @@ def _get_authentication_query(self) -> str:
"WHERE username=${user} AND passwd=${password}"
)

def _get_zookeeper_auth_digest(self) -> str:
theoctober19th marked this conversation as resolved.
Show resolved Hide resolved
if not self.zookeeper_info:
return ""
username = self.zookeeper_info.username
password = self.zookeeper_info.password
return f"{username}:{password}"

@property
def _auth_conf(self) -> dict[str, str]:
if not self.db_info:
Expand All @@ -41,9 +51,22 @@ def _auth_conf(self) -> dict[str, str]:
"kyuubi.authentication.jdbc.query": self._get_authentication_query(),
}

@property
def _ha_conf(self) -> dict[str, str]:
if not self.zookeeper_info:
return {}
return {
"kyuubi.ha.addresses": self.zookeeper_info.uris,
# FIXME: Get this value from self.context.zookeeper.uris when znode created by
# zookeeper charm has enough permissions for Kyuubi to work
"kyuubi.ha.namespace": HA_ZNODE_NAME,
theoctober19th marked this conversation as resolved.
Show resolved Hide resolved
"kyuubi.ha.zookeeper.auth.type": "DIGEST",
"kyuubi.ha.zookeeper.auth.digest": self._get_zookeeper_auth_digest(),
}

def to_dict(self) -> dict[str, str]:
"""Return the dict representation of the configuration file."""
return self._auth_conf
return self._auth_conf | self._ha_conf

@property
def contents(self) -> str:
Expand Down
14 changes: 11 additions & 3 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,30 @@

KYUUBI_CONTAINER_NAME = "kyuubi"
KYUUBI_SERVICE_NAME = "kyuubi"

# Database related literals
METASTORE_DATABASE_NAME = "hivemetastore"
AUTHENTICATION_DATABASE_NAME = "auth_db"
AUTHENTICATION_TABLE_NAME = "kyuubi_users"
POSTGRESQL_DEFAULT_DATABASE = "postgres"

# Relation names
S3_INTEGRATOR_REL = "s3-credentials"
POSTGRESQL_METASTORE_DB_REL = "metastore-db"
POSTGRESQL_AUTH_DB_REL = "auth-db"
SPARK_SERVICE_ACCOUNT_REL = "spark-service-account"
ZOOKEEPER_REL = "zookeeper"
KYUUBI_CLIENT_RELATION_NAME = "jdbc"

# Literals related to K8s
NAMESPACE_CONFIG_NAME = "namespace"
SERVICE_ACCOUNT_CONFIG_NAME = "service-account"

# Literals related to Kyuubi
JDBC_PORT = 10009

KYUUBI_OCI_IMAGE = "ghcr.io/canonical/charmed-spark-kyuubi:3.4-22.04_edge"

DEFAULT_ADMIN_USERNAME = "admin"
KYUUBI_CLIENT_RELATION_NAME = "jdbc"

# Zookeeper literals
HA_ZNODE_NAME = "/kyuubi"
HA_ZNODE_NAME_TEMP = "/kyuubi-temp"
theoctober19th marked this conversation as resolved.
Show resolved Hide resolved
34 changes: 33 additions & 1 deletion src/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@
from common.relation.spark_sa import RequirerData
from constants import (
AUTHENTICATION_DATABASE_NAME,
HA_ZNODE_NAME_TEMP,
METASTORE_DATABASE_NAME,
POSTGRESQL_AUTH_DB_REL,
POSTGRESQL_METASTORE_DB_REL,
S3_INTEGRATOR_REL,
SPARK_SERVICE_ACCOUNT_REL,
ZOOKEEPER_REL,
)
from core.domain import (
DatabaseConnectionInfo,
S3ConnectionInfo,
SparkServiceAccountInfo,
ZookeeperInfo,
)
from core.domain import DatabaseConnectionInfo, S3ConnectionInfo, SparkServiceAccountInfo
from utils.logging import WithLogging


Expand All @@ -36,6 +43,13 @@ def __init__(self, model: Model, config: ConfigData):
extra_user_roles="superuser",
)

# FIXME: The database_name currently requested is a dummy name
# This should be replaced with the name of actual znode when znode created
# by zookeeper charm has enough permissions for Kyuubi to work
self.zookeeper_requirer_data = DatabaseRequirerData(
self.model, ZOOKEEPER_REL, database_name=HA_ZNODE_NAME_TEMP
theoctober19th marked this conversation as resolved.
Show resolved Hide resolved
)

@property
def _s3_relation(self) -> Relation | None:
"""The S3 relation."""
Expand All @@ -46,6 +60,11 @@ def _spark_account_relation(self) -> Relation | None:
"""The integration hub relation."""
return self.model.get_relation(SPARK_SERVICE_ACCOUNT_REL)

@property
def _zookeeper_relation(self) -> Relation | None:
"""The zookeeper relation."""
return self.model.get_relation(ZOOKEEPER_REL)

# --- DOMAIN OBJECTS ---

@property
Expand Down Expand Up @@ -91,6 +110,19 @@ def service_account(self) -> SparkServiceAccountInfo | None:
):
return account

@property
def zookeeper(self) -> ZookeeperInfo | None:
"""The state of the Zookeeper information."""
return (
ZookeeperInfo(rel, self.zookeeper_requirer_data, rel.app)
if (rel := self._zookeeper_relation)
else None
)

def is_authentication_enabled(self) -> bool:
"""Returns whether the authentication has been enabled in the Kyuubi charm."""
return bool(self.auth_db)

def is_ha_enabled(self) -> bool:
"""Returns whether HA has been enabled in the Kyuubi charm."""
return bool(self.zookeeper)
101 changes: 100 additions & 1 deletion src/core/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Status(Enum):
MISSING_INTEGRATION_HUB = BlockedStatus("Missing integration hub relation")
INVALID_NAMESPACE = BlockedStatus("Invalid config option: namespace")
INVALID_SERVICE_ACCOUNT = BlockedStatus("Invalid config option: service-account")

WAITING_ZOOKEEPER = MaintenanceStatus("Waiting for zookeeper credentials")
ACTIVE = ActiveStatus("")


Expand Down Expand Up @@ -152,3 +152,102 @@ def service_account(self):
def namespace(self):
"""Namespace used for running Spark jobs."""
return self.relation_data["namespace"]


class ZookeeperInfo(RelationState):
"""State collection metadata for a the Zookeeper relation."""

def __init__(
self,
relation: Relation | None,
data_interface: Data,
local_app: Application | None = None,
):
super().__init__(relation, data_interface, None)
self._local_app = local_app

@property
def username(self) -> str:
"""Username to connect to ZooKeeper."""
if not self.relation:
return ""

return (
self.data_interface.fetch_relation_field(
relation_id=self.relation.id, field="username"
)
or ""
)

@property
def password(self) -> str:
"""Password of the ZooKeeper user."""
if not self.relation:
return ""

return (
self.data_interface.fetch_relation_field(
relation_id=self.relation.id, field="password"
)
or ""
)

@property
def endpoints(self) -> str:
"""IP/host where ZooKeeper is located."""
if not self.relation:
return ""

return (
self.data_interface.fetch_relation_field(
relation_id=self.relation.id, field="endpoints"
)
or ""
)

@property
def database(self) -> str:
"""Path allocated for Kyuubi on ZooKeeper."""
if not self.relation:
return ""

return (
self.data_interface.fetch_relation_field(
relation_id=self.relation.id, field="database"
)
or ""
)

@property
def uris(self) -> str:
"""Comma separated connection string, containing endpoints."""
if not self.relation:
return ""

return ",".join(
sorted( # sorting as they may be disordered
(
self.data_interface.fetch_relation_field(
relation_id=self.relation.id, field="uris"
)
or ""
).split(",")
)
).replace(self.database, "")

@property
def zookeeper_connected(self) -> bool:
"""Checks if there is an active ZooKeeper relation with all necessary data.

Returns:
True if ZooKeeper is currently related with sufficient relation data
for a broker to connect with. Otherwise False
"""
if not all([self.username, self.password, self.database, self.uris]):
return False

return True

def __bool__(self) -> bool:
"""Return whether this class object has sufficient information."""
return self.zookeeper_connected
6 changes: 6 additions & 0 deletions src/core/workload/kyuubi.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ def __init__(self, container: Container, user: User = User()):
self.container = container
self.user = user

def get_ip_address(self) -> str:
"""Return the IP address of the unit running the workload."""
hostname = socket.getfqdn()
ip_address = socket.gethostbyname(hostname)
return ip_address

def get_jdbc_endpoint(self) -> str:
"""Return the JDBC endpoint to connect to Kyuubi server."""
hostname = socket.getfqdn()
Expand Down
37 changes: 17 additions & 20 deletions src/events/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ops import CharmBase
from ops.charm import ActionEvent

from constants import DEFAULT_ADMIN_USERNAME
from constants import DEFAULT_ADMIN_USERNAME, HA_ZNODE_NAME, JDBC_PORT
from core.context import Context
from core.domain import Status
from core.workload import KyuubiWorkloadBase
Expand Down Expand Up @@ -39,15 +39,22 @@ def _on_get_jdbc_endpoint(self, event: ActionEvent):
if not self.workload.ready():
event.fail("The action failed because the workload is not ready yet.")
return
if (
not self.get_app_status(
s3_info=self.context.s3, service_account=self.context.service_account
)
!= Status.ACTIVE
):
if self.get_app_status() != Status.ACTIVE.value:
event.fail("The action failed because the charm is not in active state.")
return
result = {"endpoint": self.workload.get_jdbc_endpoint()}

if self.context.is_ha_enabled():
address = self.context.zookeeper.uris
# FIXME: Get this value from self.context.zookeeper.uris when znode created by
# zookeeper charm has enough permissions for Kyuubi to work
namespace = HA_ZNODE_NAME
if not address.endswith("/"):
address += "/"
endpoint = f"jdbc:hive2://{address};serviceDiscoveryMode=zooKeeper;zooKeeperNamespace={namespace}"
theoctober19th marked this conversation as resolved.
Show resolved Hide resolved
else:
address = self.workload.get_ip_address()
endpoint = f"jdbc:hive2://{address}:{JDBC_PORT}/"
result = {"endpoint": endpoint}
event.set_results(result)

def _on_get_password(self, event: ActionEvent) -> None:
Expand All @@ -61,12 +68,7 @@ def _on_get_password(self, event: ActionEvent) -> None:
if not self.workload.ready():
event.fail("The action failed because the workload is not ready yet.")
return
if (
not self.get_app_status(
s3_info=self.context.s3, service_account=self.context.service_account
)
!= Status.ACTIVE
):
if self.get_app_status() != Status.ACTIVE.value:
event.fail("The action failed because the charm is not in active state.")
return
password = self.auth.get_password(DEFAULT_ADMIN_USERNAME)
Expand All @@ -89,12 +91,7 @@ def _on_set_password(self, event: ActionEvent) -> None:
if not self.workload.ready():
event.fail("The action failed because the workload is not ready yet.")
return
if (
not self.get_app_status(
s3_info=self.context.s3, service_account=self.context.service_account
)
!= Status.ACTIVE
):
if self.get_app_status() != Status.ACTIVE.value:
event.fail("The action failed because the charm is not in active state.")
return

Expand Down
Loading
Loading